diff --git a/README-CN.md b/README-CN.md index 4420e0bffd269eccd9672a61d26f784411c71412..dae07a77d498007b6a2950a97d927cc7de1f574e 100644 --- a/README-CN.md +++ b/README-CN.md @@ -19,7 +19,7 @@ # TDengine 简介 -TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供缓存、数据订阅、流式计算等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。TDengine 的主要优势如下: +TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供缓存、数据订阅、流式计算等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。与其他时序数据库相比,TDengine 的主要优势如下: - 高性能:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的1/10。 @@ -41,7 +41,9 @@ TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series TDengine 目前可以在 Linux、 Windows 等平台上安装和运行。任何 OS 的应用也可以选择 taosAdapter 的 RESTful 接口连接服务端 taosd。CPU 支持 X64/ARM64,后续会支持 MIPS64、Alpha64、ARM32、RISC-V 等 CPU 架构。 -用户可根据需求选择通过[源码](https://www.taosdata.com/cn/getting-started/#通过源码安装)或者[安装包](https://www.taosdata.com/cn/getting-started/#通过安装包安装)来安装。本快速指南仅适用于通过源码安装。 +用户可根据需求选择通过源码、[容器](https://docs.taosdata.com/3.0/get-started/docker/)、[安装包](https://docs.taosdata.com/3.0/get-started/package/)或[Kubenetes](https://docs.taosdata.com/3.0/deployment/k8s/)来安装。本快速指南仅适用于通过源码安装。 + +TDengine 还提供一组辅助工具软件 taosTools,目前它包含 taosBenchmark(曾命名为 taosdemo)和 taosdump 两个软件。默认 TDengine 编译不包含 taosTools, 您可以在编译 TDengine 时使用`cmake .. -DBUILD_TOOLS=true` 来同时编译 taosTools。 ## 安装工具 @@ -53,10 +55,6 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev #### 为 taos-tools 安装编译需要的软件 -taosTools 是用于 TDengine 的辅助工具软件集合。目前它包含 taosBenchmark(曾命名为 taosdemo)和 taosdump 两个软件。 - -默认 TDengine 编译不包含 taosTools。您可以在编译 TDengine 时使用`cmake .. -DBUILD_TOOLS=true` 来同时编译 taosTools。 - 为了在 Ubuntu/Debian 系统上编译 [taos-tools](https://github.com/taosdata/taos-tools) 需要安装如下软件: ```bash @@ -104,7 +102,7 @@ sudo yum config-manager --set-enabled Powertools ### 设置 golang 开发环境 -TDengine 包含数个使用 Go 语言开发的组件,请参考 golang.org 官方文档设置 go 开发环境。 +TDengine 包含数个使用 Go 语言开发的组件,比如taosAdapter, 请参考 golang.org 官方文档设置 go 开发环境。 请使用 1.14 及以上版本。对于中国用户,我们建议使用代理来加速软件包下载。 @@ -113,7 +111,7 @@ go env -w GO111MODULE=on go env -w GOPROXY=https://goproxy.cn,direct ``` -默认情况下,内嵌的 http 服务仍然可以从 TDengine 源码构建。当然您也可以使用以下命令选择构建 taosAdapter 作为 RESTful 接口的服务。 +缺省是不会构建 taosAdapter, 但您可以使用以下命令选择构建 taosAdapter 作为 RESTful 接口的服务。 ``` cmake .. -DBUILD_HTTP=false diff --git a/README.md b/README.md index 8ef90b69c01c9f19d3525d8aba1bf54b92b245cb..0cb16c18c473a504c3a15c8292ba05f5b82eb962 100644 --- a/README.md +++ b/README.md @@ -20,28 +20,30 @@ English | [简体中文](README-CN.md) | We are hiring, check [here](https://tde # What is TDengine? -TDengine is an open source, cloud native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other TSDBs with the following advantages.: +TDengine is an open source, high performance, cloud native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other TSDBs with the following advantages.: - High-Performance: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression. - Simplified Solution: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly. -- Cloud Native: Through native distributed design, sharding and partitioning, separation of compute and storage, RAFT, support for kubernetes deployment and full observability, TDengine can be deployed on public, private or hybrid clouds. - -- Open Source: TDengine’s core modules, including cluster feature, are all available under open source licenses. It has gathered 18.8k stars on GitHub, an active developer community, and over 137k running instances worldwide. +- Cloud Native: Through native distributed design, sharding and partitioning, separation of compute and storage, RAFT, support for kubernetes deployment and full observability, TDengine is a cloud native Time-Series Database and can be deployed on public, private or hybrid clouds. - Ease of Use: For administrators, TDengine significantly reduces the effort to deploy and maintain. For developers, it provides a simple interface, simplified solution and seamless integrations for third party tools. For data users, it gives easy data access. - Easy Data Analytics: Through super tables, storage and compute separation, data partitioning by time interval, pre-computation and other means, TDengine makes it easy to explore, format, and get access to data in a highly efficient way. +- Open Source: TDengine’s core modules, including cluster feature, are all available under open source licenses. It has gathered 18.8k stars on GitHub, an active developer community, and over 137k running instances worldwide. + # Documentation -For user manual, system design and architecture, please refer to [TDengine Documentation](https://docs.tdengine.com) (中文版请点击[这里](https://docs.taosdata.com)) +For user manual, system design and architecture, please refer to [TDengine Documentation](https://docs.tdengine.com) ([中文版](https://docs.taosdata.com)) # Building At the moment, TDengine server supports running on Linux and Windows systems. You can choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) or build it from the source code. This quick guide is for installation from the source only. +We provide a few useful tools such as taosBenchmark (was named taosdemo) and taosdump. They were part of TDengine. By default, TDengine compiling does not include taosTools. You can use 'cmake .. -DBUILD_TOOLS=true' to make them be compiled with TDengine. + To build TDengine, use [CMake](https://cmake.org/) 3.0.2 or higher versions in the project directory. ## Install build dependencies @@ -54,9 +56,6 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev #### Install build dependencies for taosTools -We provide a few useful tools such as taosBenchmark (was named taosdemo) and taosdump. They were part of TDengine. From TDengine 2.4.0.0, taosBenchmark and taosdump were not released together with TDengine. -By default, TDengine compiling does not include taosTools. You can use 'cmake .. -DBUILD_TOOLS=true' to make them be compiled with TDengine. - To build the [taosTools](https://github.com/taosdata/taos-tools) on Ubuntu/Debian, the following packages need to be installed. ```bash @@ -90,7 +89,7 @@ Note: Since snappy lacks pkg-config support (refer to [link](https://github.com/ ### Setup golang environment -TDengine includes a few components developed by Go language. Please refer to golang.org official documentation for golang environment setup. +TDengine includes a few components like taosAdapter developed by Go language. Please refer to golang.org official documentation for golang environment setup. Please use version 1.14+. For the user in China, we recommend using a proxy to accelerate package downloading. @@ -101,7 +100,7 @@ go env -w GOPROXY=https://goproxy.cn,direct ### Setup rust environment -TDengine includees a few compoments developed by Rust language. Please refer to rust-lang.org official documentation for rust environment setup. +TDengine includes a few compoments developed by Rust language. Please refer to rust-lang.org official documentation for rust environment setup. ## Get the source codes @@ -140,14 +139,7 @@ cmake .. -DBUILD_TOOLS=true make ``` -Note TDengine 2.3.x.0 and later use a component named 'taosAdapter' to play http daemon role by default instead of the http daemon embedded in the early version of TDengine. The taosAdapter is programmed by go language. If you pull TDengine source code to the latest from an existing codebase, please execute 'git submodule update --init --recursive' to pull taosAdapter source code. Please install go language version 1.14 or above for compiling taosAdapter. If you meet difficulties regarding 'go mod', especially you are from China, you can use a proxy to solve the problem. - -``` -go env -w GO111MODULE=on -go env -w GOPROXY=https://goproxy.cn,direct -``` - -The embedded http daemon still be built from TDengine source code by default. Or you can use the following command to choose to build taosAdapter. +Note TDengine 2.3.x.0 and later use a component named 'taosAdapter' to play http daemon role. If you pull TDengine source code to the latest from an existing codebase, please execute 'git submodule update --init --recursive' to pull taosAdapter source code, and use the following command to choose to build taosAdapter. ``` cmake .. -DBUILD_HTTP=false @@ -172,7 +164,7 @@ cmake .. -DCPUTYPE=aarch64 && cmake --build . ### On Windows platform If you use the Visual Studio 2013, please open a command window by executing "cmd.exe". -Please specify "amd64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat. +Please specify "amd64" for 64 bits Windows or specify "x86" for 32 bits Windows when you execute vcvarsall.bat. ```cmd mkdir debug && cd debug @@ -184,7 +176,7 @@ nmake If you use the Visual Studio 2019 or 2017: please open a command window by executing "cmd.exe". -Please specify "x64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat. +Please specify "x64" for 64 bits Windows or specify "x86" for 32 bits Windows when you execute vcvarsall.bat. ```cmd mkdir debug && cd debug @@ -237,19 +229,6 @@ taos If TDengine shell connects the server successfully, welcome messages and version info are printed. Otherwise, an error message is shown. -### Install TDengine by apt-get - -If you use Debian or Ubuntu system, you can use 'apt-get' command to install TDengine from official repository. Please use following commands to setup: - -``` -wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - -echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list -[Optional] echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list -sudo apt-get update -apt-cache policy tdengine -sudo apt-get install tdengine -``` - ## On Windows platform After building successfully, TDengine can be installed by: diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 989f90a150e8669738e1ce9f7795631c1dab30a5..3a6eb3c25a553d588c9eb7eb703c6e4dd3da53db 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 43924b8 + GIT_TAG 53a0103 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/05-get-started/01-docker.md b/docs/en/05-get-started/01-docker.md index 08d897ec077124b5fb924a847d782cd51bd75e96..14f5a8800072971a2ffa8550c838212d7b6a9907 100644 --- a/docs/en/05-get-started/01-docker.md +++ b/docs/en/05-get-started/01-docker.md @@ -32,19 +32,6 @@ docker exec -it bash 然后就可以执行相关的 Linux 命令操作和访问 TDengine -:::info - -Docker 工具自身的下载请参考 [Docker 官网文档](https://docs.docker.com/get-docker/)。 - -安装完毕后可以在命令行终端查看 Docker 版本。如果版本号正常输出,则说明 Docker 环境已经安装成功。 - -```bash -$ docker -v -Docker version 20.10.3, build 48d30b5 -``` - -::: - ## 运行 TDengine CLI 进入容器,执行 taos @@ -113,4 +100,4 @@ taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s); ## 其它 -更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker) \ No newline at end of file +更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker) diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py new file mode 100644 index 0000000000000000000000000000000000000000..1f6da3d1b6690ab12527c1810286ba22a7688851 --- /dev/null +++ b/docs/examples/python/tmq_example.py @@ -0,0 +1,59 @@ +import taos +from taos.tmq import * + +conn = taos.connect() + +# create database +conn.execute("drop database if exists py_tmq") +conn.execute("create database if not exists py_tmq vgroups 2") + +# create table and stables +conn.select_db("py_tmq") +conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") +conn.execute("create table if not exists tb1 using stb1 tags(1)") +conn.execute("create table if not exists tb2 using stb1 tags(2)") +conn.execute("create table if not exists tb3 using stb1 tags(3)") + +# create topic +conn.execute("drop topic if exists topic_ctb_column") +conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1") + +# set consumer configure options +conf = TaosTmqConf() +conf.set("group.id", "tg2") +conf.set("td.connect.user", "root") +conf.set("td.connect.pass", "taosdata") +conf.set("enable.auto.commit", "true") +conf.set("msg.with.table.name", "true") + +def tmq_commit_cb_print(tmq, resp, offset, param=None): + print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") + +conf.set_auto_commit_cb(tmq_commit_cb_print, None) + +# build consumer +tmq = conf.new_consumer() + +# build topic list +topic_list = TaosTmqList() +topic_list.append("topic_ctb_column") + +# subscribe consumer +tmq.subscribe(topic_list) + +# check subscriptions +sub_list = tmq.subscription() +print("subscribed topics: ",sub_list) + +# start subscribe +while 1: + res = tmq.poll(1000) + if res: + topic = res.get_topic_name() + vg = res.get_vgroup_id() + db = res.get_db_name() + print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}") + for row in res: + print(row) + tb = res.get_table_name() + print(f"from table: {tb}") diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md index 66ad82adb8f8a8b11049d90b10be21c0f527123e..814784b649ecbaf32ec93d6b26cb40f2a98e82d8 100644 --- a/docs/zh/05-get-started/01-docker.md +++ b/docs/zh/05-get-started/01-docker.md @@ -3,7 +3,7 @@ sidebar_label: Docker title: 通过 Docker 快速体验 TDengine --- :::info -如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. +如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. ::: 本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。 @@ -32,18 +32,7 @@ docker exec -it bash 然后就可以执行相关的 Linux 命令操作和访问 TDengine -:::info - -Docker 工具自身的下载请参考 [Docker 官网文档](https://docs.docker.com/get-docker/)。 - -安装完毕后可以在命令行终端查看 Docker 版本。如果版本号正常输出,则说明 Docker 环境已经安装成功。 - -```bash -$ docker -v -Docker version 20.10.3, build 48d30b5 -``` - -::: +注: Docker 工具自身的下载和使用请参考 [Docker 官网文档](https://docs.docker.com/get-docker/)。 ## 运行 TDengine CLI @@ -109,4 +98,4 @@ taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s); ## 其它 -更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker) \ No newline at end of file +更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index c03a25ef5832df5e8e1fd5493189055ba5a01819..63698aab505a4d8d490f75cfb619ef2e069aaca7 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -46,8 +46,8 @@ apt-get 方式只适用于 Debian 或 Ubuntu 系统 -1、从官网下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb; -2、进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令: +1. 从 [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb; +2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令: ```bash sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb @@ -57,8 +57,8 @@ sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb -1、从官网下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm; -2、进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令: +1. 从 [发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm; +2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令: ```bash sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm @@ -68,8 +68,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm -1、从官网下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; -2、进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: +1. 从 [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; +2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: ```bash tar -zxvf TDengine-server-3.0.0.0-Linux-x64.tar.gz @@ -89,7 +89,9 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 -TODO + +1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; +2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 @@ -152,14 +154,14 @@ systemctl 命令汇总: -TODO +安装后,在 C:\TDengine 目录下,运行 taosd.exe 来启动 TDengine 服务进程。 ## TDengine 命令行 (CLI) -为便于检查 TDengine 的状态,执行数据库 (Database) 的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI) taos。要进入 TDengine 命令行,您只要在安装有 TDengine 的 Linux 终端执行 `taos` 即可。 +为便于检查 TDengine 的状态,执行数据库 (Database) 的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI) taos。要进入 TDengine 命令行,您只要在安装有 TDengine 的 Linux 终端执行 `taos` 即可,也可以在安装有 TDengine 的 Windows 终端的 C:\TDengine 目录下,运行 taos.exe 来启动 TDengine 命令行。 ```bash taos diff --git a/docs/zh/07-develop/04-query-data/index.mdx b/docs/zh/07-develop/04-query-data/index.mdx index eecda92744e12934f09ea8c66218f78da9a66146..68f49d9f2b36fce83dc76e43e36f1049ae3de18d 100644 --- a/docs/zh/07-develop/04-query-data/index.mdx +++ b/docs/zh/07-develop/04-query-data/index.mdx @@ -43,7 +43,7 @@ Query OK, 2 row(s) in set (0.001100s) 为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。 -具体的查询语法请看 [TAOS SQL 的数据查询](/taos-sql/select) 章节。 +具体的查询语法请看 [TAOS SQL 的数据查询](../../taos-sql/select) 章节。 ## 多表聚合查询 @@ -74,7 +74,7 @@ taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now - Query OK, 1 row(s) in set (0.002136s) ``` -在 [TAOS SQL 的数据查询](/taos-sql/select) 一章,查询类操作都会注明是否支持超级表。 +在 [TAOS SQL 的数据查询](../../taos-sql/select) 一章,查询类操作都会注明是否支持超级表。 ## 降采样查询、插值 @@ -121,7 +121,7 @@ Query OK, 5 row(s) in set (0.001521s) 如果一个时间间隔里,没有采集的数据,TDengine 还提供插值计算的功能。 -语法规则细节请见 [TAOS SQL 的按时间窗口切分聚合](/taos-sql/interval) 章节。 +语法规则细节请见 [TAOS SQL 的按时间窗口切分聚合](../../taos-sql/distinguished) 章节。 ## 示例代码 diff --git a/docs/zh/07-develop/06-stream.md b/docs/zh/07-develop/06-stream.md index dbf86840080f9bfdb95a3c67c4fbdd721ed3e946..5217fa8389281340029cbe768b247f0025c7a198 100644 --- a/docs/zh/07-develop/06-stream.md +++ b/docs/zh/07-develop/06-stream.md @@ -29,100 +29,77 @@ stream_options: { 首先准备数据,完成建库、建一张超级表和多张子表操作 ```sql -drop database if exists stream_db; -create database stream_db; +DROP DATABASE IF EXISTS power; +CREATE DATABASE power; +USE power; -create stable stream_db.meters (ts timestamp, current float, voltage int) TAGS (location varchar(64), groupId int); +CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int); -create table stream_db.d1001 using stream_db.meters tags("beijing", 1); -create table stream_db.d1002 using stream_db.meters tags("guangzhou", 2); -create table stream_db.d1003 using stream_db.meters tags("shanghai", 3); +CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2); +CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3); +CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2); +CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3); ``` ### 创建流 ```sql -create stream stream1 into stream_db.stream1_output_stb as select _wstart as start, _wend as end, max(current) as max_current from stream_db.meters where voltage <= 220 and ts > now - 12h interval (1h); +create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 and ts > now - 12h interval (1h); ``` ### 写入数据 ```sql -insert into stream_db.d1001 values(now-14h, 10.3, 210); -insert into stream_db.d1001 values(now-13h, 13.5, 216); -insert into stream_db.d1001 values(now-12h, 12.5, 219); -insert into stream_db.d1002 values(now-11h, 14.7, 221); -insert into stream_db.d1002 values(now-10h, 10.5, 218); -insert into stream_db.d1002 values(now-9h, 11.2, 220); -insert into stream_db.d1003 values(now-8h, 11.5, 217); -insert into stream_db.d1003 values(now-7h, 12.3, 227); -insert into stream_db.d1003 values(now-6h, 12.3, 215); +insert into d1001 values(now-13h, 10.30000, 219, 0.31000); +insert into d1001 values(now-11h, 12.60000, 218, 0.33000); +insert into d1001 values(now-10h, 12.30000, 221, 0.31000); +insert into d1002 values(now-9h, 10.30000, 218, 0.25000); +insert into d1003 values(now-8h, 11.80000, 221, 0.28000); +insert into d1003 values(now-7h, 13.40000, 223, 0.29000); +insert into d1004 values(now-6h, 10.80000, 223, 0.29000); +insert into d1004 values(now-5h, 11.50000, 221, 0.35000); ``` ### 查询以观查结果 + ```sql -taos> select * from stream_db.stream1_output_stb; - start | end | max_current | group_id | -=================================================================================================== - 2022-08-09 14:00:00.000 | 2022-08-09 15:00:00.000 | 10.50000 | 0 | - 2022-08-09 15:00:00.000 | 2022-08-09 16:00:00.000 | 11.20000 | 0 | - 2022-08-09 16:00:00.000 | 2022-08-09 17:00:00.000 | 11.50000 | 0 | - 2022-08-09 18:00:00.000 | 2022-08-09 19:00:00.000 | 12.30000 | 0 | -Query OK, 4 rows in database (0.012033s) +taos> select start, end, max_current from current_stream_output_stb; + start | end | max_current | +=========================================================================== + 2022-08-12 04:00:00.000 | 2022-08-12 05:00:00.000 | 12.60000 | + 2022-08-12 06:00:00.000 | 2022-08-12 07:00:00.000 | 10.30000 | +Query OK, 2 rows in database (0.009580s) ``` ## 示例二 -某运营商平台要采集机房所有服务器的系统资源指标,包含 cpu、内存、网络延迟等,采集后需要对数据进行四舍五入运算,将地域和服务器名以下划线拼接,然后将结果按时间排序并以服务器名分组输出到新的数据表中。 -### 创建 DB 和原始数据表 -首先准备数据,完成建库、建一张超级表和多张子表操作 +依然以示例一中的数据为基础,我们已经采集到了每个智能电表的电流和电压数据,现在要求出功率,并将地域和电表名以符号 "." 拼接,然后以电表名称分组输出到新的数据表中。 -```sql -drop database if exists stream_db; -create database stream_db; - -create stable stream_db.idc (ts timestamp, cpu float, mem float, latency float) TAGS (location varchar(64), groupId int); +### 创建 DB 和原始数据表 -create table stream_db.server01 using stream_db.idc tags("beijing", 1); -create table stream_db.server02 using stream_db.idc tags("shanghai", 2); -create table stream_db.server03 using stream_db.idc tags("beijing", 2); -create table stream_db.server04 using stream_db.idc tags("tianjin", 3); -create table stream_db.server05 using stream_db.idc tags("shanghai", 1); -``` +参考示例一 [创建 DB 和原始数据表](#创建-db-和原始数据表) ### 创建流 ```sql -create stream stream2 into stream_db.stream2_output_stb as select ts, concat_ws("_", location, tbname) as server_location, round(cpu) as cpu, round(mem) as mem, round(latency) as latency from stream_db.idc partition by tbname order by ts; +create stream power_stream into power_stream_output_stb as select ts, concat_ws(".", location, tbname) as meter_location, current*voltage as meter_power from meters partition by tbname; ``` ### 写入数据 -```sql -insert into stream_db.server01 values(now-14h, 50.9, 654.8, 23.11); -insert into stream_db.server01 values(now-13h, 13.5, 221.2, 11.22); -insert into stream_db.server02 values(now-12h, 154.7, 218.3, 22.33); -insert into stream_db.server02 values(now-11h, 120.5, 111.5, 5.55); -insert into stream_db.server03 values(now-10h, 101.5, 125.6, 5.99); -insert into stream_db.server03 values(now-9h, 12.3, 165.6, 6.02); -insert into stream_db.server04 values(now-8h, 160.9, 120.7, 43.51); -insert into stream_db.server04 values(now-7h, 240.9, 520.7, 54.55); -insert into stream_db.server05 values(now-6h, 190.9, 320.7, 55.43); -insert into stream_db.server05 values(now-5h, 110.9, 600.7, 35.54); -``` + +参考示例一 [写入数据](#写入数据) + ### 查询以观查结果 ```sql -taos> select ts, server_location, cpu, mem, latency from stream_db.stream2_output_stb; - ts | server_location | cpu | mem | latency | -================================================================================================================================ - 2022-08-09 21:24:56.785 | beijing_server01 | 51.00000 | 655.00000 | 23.00000 | - 2022-08-09 22:24:56.795 | beijing_server01 | 14.00000 | 221.00000 | 11.00000 | - 2022-08-09 23:24:56.806 | shanghai_server02 | 155.00000 | 218.00000 | 22.00000 | - 2022-08-10 00:24:56.815 | shanghai_server02 | 121.00000 | 112.00000 | 6.00000 | - 2022-08-10 01:24:56.826 | beijing_server03 | 102.00000 | 126.00000 | 6.00000 | - 2022-08-10 02:24:56.838 | beijing_server03 | 12.00000 | 166.00000 | 6.00000 | - 2022-08-10 03:24:56.846 | tianjin_server04 | 161.00000 | 121.00000 | 44.00000 | - 2022-08-10 04:24:56.853 | tianjin_server04 | 241.00000 | 521.00000 | 55.00000 | - 2022-08-10 05:24:56.866 | shanghai_server05 | 191.00000 | 321.00000 | 55.00000 | - 2022-08-10 06:24:57.301 | shanghai_server05 | 111.00000 | 601.00000 | 36.00000 | -Query OK, 10 rows in database (0.022950s) -``` - +taos> select ts, meter_location, meter_power from power_stream_output_stb; + ts | meter_location | meter_power | +======================================================================================= + 2022-08-12 07:44:47.817 | California.SanFrancisco.d1002 | 2245.400041580 | + 2022-08-12 08:44:47.826 | California.LosAngeles.d1003 | 2607.800042152 | + 2022-08-12 09:44:47.833 | California.LosAngeles.d1003 | 2988.199914932 | + 2022-08-12 03:44:47.791 | California.SanFrancisco.d1001 | 2255.700041771 | + 2022-08-12 05:44:47.800 | California.SanFrancisco.d1001 | 2746.800083160 | + 2022-08-12 06:44:47.809 | California.SanFrancisco.d1001 | 2718.300042152 | + 2022-08-12 10:44:47.840 | California.LosAngeles.d1004 | 2408.400042534 | + 2022-08-12 11:44:48.379 | California.LosAngeles.d1004 | 2541.500000000 | +Query OK, 8 rows in database (0.014788s) +``` \ No newline at end of file diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index 459b88085f9910bca4840192223595a47a0bf2b9..25d468cad3658190f6b9409637543061ac22f958 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -4,6 +4,17 @@ description: "数据订阅与推送服务。写入到 TDengine 中的时序数 title: 数据订阅 --- +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; +import Java from "./_sub_java.mdx"; +import Python from "./_sub_python.mdx"; +import Go from "./_sub_go.mdx"; +import Rust from "./_sub_rust.mdx"; +import Node from "./_sub_node.mdx"; +import CSharp from "./_sub_cs.mdx"; +import CDemo from "./_sub_c.mdx"; + + 为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 @@ -51,7 +62,7 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); ``` -这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在 [tmq.c](https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c) 看到。 +这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码。 ## 写入数据 @@ -62,13 +73,9 @@ drop database if exists tmqdb; create database tmqdb; create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16) tags(t1 int, t3 varchar(16)); create table tmqdb.ctb0 using tmqdb.stb tags(0, "subtable0"); -create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1"); -create table tmqdb.ctb2 using tmqdb.stb tags(2, "subtable2"); -create table tmqdb.ctb3 using tmqdb.stb tags(3, "subtable3"); +create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1"); insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); -insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22'); -insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33'); ``` ## 创建topic: @@ -130,7 +137,6 @@ TMQ支持多种订阅类型: tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); - return tmq; ``` 上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。 @@ -143,66 +149,23 @@ TMQ支持多种订阅类型: ```sql tmq_list_t* topicList = tmq_list_new(); tmq_list_append(topicList, "topicName"); - return topicList; ``` ## 启动订阅并开始消费 -```sql +``` /* 启动订阅 */ tmq_subscribe(tmq, topicList); tmq_list_destroy(topicList); /* 循环poll消息 */ - int32_t totalRows = 0; - int32_t msgCnt = 0; - int32_t timeOut = 5000; while (running) { TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut); - if (tmqmsg) { - msgCnt++; - totalRows += msg_process(tmqmsg); - taos_free_result(tmqmsg); - } else { - break; - } - } - - fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + msg_process(tmqmsg); + } ``` -这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析: - -```sql - static int32_t msg_process(TAOS_RES* msg) { - char buf[1024]; - int32_t rows = 0; - - const char* topicName = tmq_get_topic_name(msg); - const char* dbName = tmq_get_db_name(msg); - int32_t vgroupId = tmq_get_vgroup_id(msg); - - printf("topic: %s\n", topicName); - printf("db: %s\n", dbName); - printf("vgroup id: %d\n", vgroupId); - - while (1) { - TAOS_ROW row = taos_fetch_row(msg); - if (row == NULL) break; - - TAOS_FIELD* fields = taos_fetch_fields(msg); - int32_t numOfFields = taos_field_count(msg); - int32_t* length = taos_fetch_lengths(msg); - int32_t precision = taos_result_precision(msg); - const char* tbName = tmq_get_table_name(msg); - rows++; - taos_print_row(buf, row, fields, numOfFields); - printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf); - } - - return rows; -} -``` +这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。 ## 结束消费 @@ -243,4 +206,44 @@ TMQ支持多种订阅类型: show subscriptions; ``` +## 示例代码 + +本节展示各种语言的示例代码。 + + + + +```c +{{#include examples/c/tmq.c}} +``` + + + + + + + + + + + + + + + + +```python +{{#include docs/examples/python/tmq_example.py}} +``` + + + + + + + + + + + diff --git a/docs/zh/12-taos-sql/25-grant.md b/docs/zh/12-taos-sql/25-grant.md index 0c290350cc155e975e5a817c991bebc74944cd04..c41a3fcfc9ee42e56e48082da5b6420073d92cdf 100644 --- a/docs/zh/12-taos-sql/25-grant.md +++ b/docs/zh/12-taos-sql/25-grant.md @@ -8,7 +8,7 @@ title: 权限管理 ## 创建用户 ```sql -CREATE USER use_name PASS password; +CREATE USER use_name PASS 'password'; ``` 创建用户。 @@ -91,4 +91,4 @@ priv_level : { ``` -收回对用户的授权。 \ No newline at end of file +收回对用户的授权。 diff --git a/docs/zh/14-reference/03-connector/01-error-code.md b/docs/zh/14-reference/03-connector/_01-error-code.md similarity index 100% rename from docs/zh/14-reference/03-connector/01-error-code.md rename to docs/zh/14-reference/03-connector/_01-error-code.md diff --git a/docs/zh/14-reference/03-connector/java.mdx b/docs/zh/14-reference/03-connector/java.mdx index 7a107bd04dbf18173ef650bab2804617668b682e..e33d09c1ce69e0c96dedac198b73f425c531b4cc 100644 --- a/docs/zh/14-reference/03-connector/java.mdx +++ b/docs/zh/14-reference/03-connector/java.mdx @@ -358,7 +358,7 @@ JDBC 连接器可能报错的错误码包括 3 种:JDBC driver 本身的报错 具体的错误码请参考: - [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java) -- [TDengine_ERROR_CODE](../error-code) + ### 通过参数绑定写入数据 diff --git a/docs/zh/17-operation/01-pkg-install.md b/docs/zh/17-operation/01-pkg-install.md index 533a01542a37cf1fdc9bec8d2e961f4e02adc46c..5e4cc931309ea8bf45b1840a7da04e336434bdab 100644 --- a/docs/zh/17-operation/01-pkg-install.md +++ b/docs/zh/17-operation/01-pkg-install.md @@ -86,7 +86,7 @@ TDengine is removed successfully! -TODO +在 C:\TDengine 目录下,通过运行 unins000.exe 卸载程序来卸载 TDengine。 diff --git a/docs/zh/21-tdinternal/01-arch.md b/docs/zh/21-tdinternal/01-arch.md index e60debc87fe37b11d185f3e626e16337cb18a2fd..a910c584d6ba47844d51e45e5010581075a72fb6 100644 --- a/docs/zh/21-tdinternal/01-arch.md +++ b/docs/zh/21-tdinternal/01-arch.md @@ -162,8 +162,6 @@ Vnode 会保持一个数据版本号(version),对内存数据进行持久 一个 vnode 启动时,角色(leader、follower)是不定的,数据是处于未同步状态,它需要与虚拟节点组内其他节点建立 TCP 连接,并互相交换 status,按照标准的 raft 一致性算法完成选主。 -更多的关于数据复制的流程,请见[《TDengine 3.0 数据复制模块设计》](/tdinternal/replica/)。 - ### 同步复制 对于数据一致性要求更高的场景,异步数据复制提供的最终一致性无法满足要求。因此 TDengine 提供同步复制的机制供用户选择。在创建数据库时,除指定副本数 replica 之外,用户还需要指定新的参数 strict。如果 strict 等于 1,它表示每次 leader 转发给副本时,需要等待半数以上副本达成一致后,才能通知应用,数据在 follower 已经写入成功。如果在一定的时间内,得不到半数以上副本的确认,leader vnode 将返回错误给应用。 @@ -241,15 +239,16 @@ dataDir /mnt/data6 2 0 ## 数据查询 -TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、mnode 节点协同完成。 - -### 单表查询 - -SQL 语句的解析和校验工作在客户端完成。解析 SQL 语句并生成抽象语法树(Abstract Syntax Tree,AST),然后对其进行校验和检查。以及向管理节点(mnode)请求查询中指定表的元数据信息(table metadata)。 +TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、qnode、mnode 节点协同完成,一个复杂的超级表聚合查询可能需要多个 vnode 和 qnode 节点公共分担查询和计算任务。 -根据元数据信息中的 End Point 信息,将查询请求序列化后发送到该表所在的数据节点(dnode)。dnode 接收到查询请求后,识别出该查询请求指向的虚拟节点(vnode),将消息转发到 vnode 的查询执行队列。vnode 的查询执行线程建立基础的查询执行环境,并立即返回该查询请求,同时开始执行该查询。 +### 查询基本流程 -客户端在获取查询结果的时候,dnode 的查询执行队列中的工作线程会等待 vnode 执行线程执行完成,才能将查询结果返回到请求的客户端。 +1. 客户端解析输入 SQL 语句并生成抽象语法树(Abstract Syntax Tree,AST),然后根据元数据信息对其进行校验和检查。在此期间,元数据管理模块(Catalog)会向管理节点(mnode)或 vnode 请求查询中指定库和表的元数据信息(table metadata)。 +2. 在通过校验检查后,客户端将生成分布式的查询计划并对查询计划进行优化处理。 +3. 客户端根据配置的查询策略进行任务调度处理,一个查询子任务会根据其数据亲缘关系或负载信息调度到某个 vnode 或 qnode 所属的数据节点(dnode)进行处理。 +4. dnode 接收到查询请求后,识别出该查询请求指向的虚拟节点(vnode)或查询节点(qnode),将消息转发到 vnode 或 qnode 的查询执行队列。 +5. vnode 或 qnode 的查询执行线程建立基础的查询执行环境,并立即执行该查询,在得到部分可获取查询结果后通知客户端。 +6. 客户端将启动下级查询任务或直接获取查询结果。 ### 按时间轴聚合、降采样、插值 @@ -279,12 +278,14 @@ TDengine 对每个数据采集点单独建表,但在实际应用中经常需
图 5 多表聚合查询原理图
-1. 应用将一个查询条件发往系统; -2. taosc 将超级表的名字发往 meta node(管理节点); -3. 管理节点将超级表所拥有的 vnode 列表发回 taosc; -4. taosc 将计算的请求连同标签过滤条件发往这些 vnode 对应的多个数据节点; -5. 每个 vnode 先在内存里查找出自己节点里符合标签过滤条件的表的集合,然后扫描存储的时序数据,完成相应的聚合计算,将结果返回给 taosc; -6. taosc 将多个数据节点返回的结果做最后的聚合,将其返回给应用。 +1. 客户端从 mnode 获取库和表的元数据信息; +2. mnode 返回请求的元数据信息; +3. 客户端向超级表所属的每个 vnode 发送查询请求; +4. vnode 启动本地查询,在获得查询结果后返回查询响应; +5. 客户端向聚合节点 (在本例中为 qnode)发送查询请求; +6. qnode 向每个 vnode 节点发送数据请求消息来拉取数据; +7. vnode 返回本节点的查询计算结果; +8. qnode 完成多节点数据聚合后将最终查询结果返回给客户端; 由于 TDengine 在 vnode 内将标签数据与时序数据分离存储,通过在内存里过滤标签数据,先找到需要参与聚合操作的表的集合,将需要扫描的数据集大幅减少,大幅提升聚合计算速度。同时,由于数据分布在多个 vnode/dnode,聚合计算操作在多个 vnode 里并发进行,又进一步提升了聚合的速度。 对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样,细节请看 TAOS SQL。 diff --git a/docs/zh/21-tdinternal/multi_tables.webp b/docs/zh/21-tdinternal/multi_tables.webp index 8f649e34a3a62d1b11b4403b2e743ff6b5e47be2..481020a5f718e2ad3f1265b2785ddff8ad9ee292 100644 Binary files a/docs/zh/21-tdinternal/multi_tables.webp and b/docs/zh/21-tdinternal/multi_tables.webp differ diff --git a/include/util/tsched.h b/include/util/tsched.h index 3bf740f5285bc7559eea0f81f88462db40b28705..347cacd19185b0891deadfdbdc6a0b42b8a71d18 100644 --- a/include/util/tsched.h +++ b/include/util/tsched.h @@ -17,6 +17,7 @@ #define _TD_UTIL_SCHED_H_ #include "os.h" +#include "tdef.h" #ifdef __cplusplus extern "C" { @@ -30,6 +31,24 @@ typedef struct SSchedMsg { void *thandle; } SSchedMsg; + +typedef struct { + char label[TSDB_LABEL_LEN]; + tsem_t emptySem; + tsem_t fullSem; + TdThreadMutex queueMutex; + int32_t fullSlot; + int32_t emptySlot; + int32_t queueSize; + int32_t numOfThreads; + TdThread *qthread; + SSchedMsg *queue; + int8_t stop; + void *pTmrCtrl; + void *pTimer; +} SSchedQueue; + + /** * Create a thread-safe ring-buffer based task queue and return the instance. A thread * pool will be created to consume the messages in the queue. @@ -38,7 +57,7 @@ typedef struct SSchedMsg { * @param label the label of the queue * @return the created queue scheduler */ -void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label); +void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue* pSched); /** * Create a thread-safe ring-buffer based task queue and return the instance. diff --git a/source/client/src/TMQConnector.c b/source/client/src/TMQConnector.c index 3755a591e3191d725b53741c34606fca418fb4c6..17d3a212c482c3462e542721d7d57f516250ff13 100644 --- a/source/client/src/TMQConnector.c +++ b/source/client/src/TMQConnector.c @@ -322,7 +322,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp( (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields); - int32_t len = *(int32_t *)data; + int32_t len = *(int32_t *)(((char *)data) + 4); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len)); return JNI_SUCCESS; } diff --git a/source/client/src/TSDBJNIConnector.c b/source/client/src/TSDBJNIConnector.c index 37041da6950999f798a0064b1f18200a946b2d43..b5a6ebadeec74e6565fd5225144cd139f6c9aaab 100644 --- a/source/client/src/TSDBJNIConnector.c +++ b/source/client/src/TSDBJNIConnector.c @@ -592,7 +592,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNI (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields); - int32_t len = *(int32_t *)data; + int32_t len = *(int32_t *)(((char *)data) + 4); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len)); return JNI_SUCCESS; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index f72b69a7de243d27d202a08fbb21561a3e1b4224..505a1384efdd7296751c6d29700b0888aa04543e 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -87,10 +87,11 @@ typedef struct { typedef struct { tsem_t syncSem; int64_t sync; - bool standby; SReplica replica; int32_t errCode; int32_t transId; + SRWLatch lock; + int8_t standby; int8_t leaderTransferFinish; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 05d0fe66c3a010677017007953c5c4c639364808..9499c90c57c59e3600c701668dd17671f641d919 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -238,7 +238,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { } else { memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1); } - if (mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) { + if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) { return -1; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 37d5aeb62d73d523031cf5049b642c5a0645ca15..03e5c2b3a2be91c259230813d564c05f035cd107 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); } + taosRLockLatch(&pMgmt->lock); if (transId <= 0) { + taosRUnLockLatch(&pMgmt->lock); mError("trans:%d, invalid commit msg", transId); } else if (transId == pMgmt->transId) { + taosRUnLockLatch(&pMgmt->lock); if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); } pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); } else { + taosRUnLockLatch(&pMgmt->lock); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mDebug("trans:%d, execute in mnode which not leader", transId); @@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); + taosWLockLatch(&pMgmt->lock); if (pMgmt->transId == -1) { if (pMgmt->errCode != 0) { mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); @@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); } + taosWUnLockLatch(&pMgmt->lock); } int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { @@ -170,12 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; mDebug("vgId:1, become follower"); - // clear old leader resource + taosWLockLatch(&pMnode->syncMgmt.lock); + if (pMnode->syncMgmt.transId != 0) { + pMnode->syncMgmt.transId = 0; + tsem_post(&pMnode->syncMgmt.syncSem); + } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } static void mndBecomeLeader(struct SSyncFSM *pFsm) { - SMnode *pMnode = pFsm->data; mDebug("vgId:1, become leader"); + SMnode *pMnode = pFsm->data; + + taosWLockLatch(&pMnode->syncMgmt.lock); + if (pMnode->syncMgmt.transId != 0) { + pMnode->syncMgmt.transId = 0; + tsem_post(&pMnode->syncMgmt.syncSem); + } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { @@ -202,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; + taosInitRWLatch(&pMgmt->lock); + pMgmt->transId = 0; SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); @@ -254,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { memcpy(req.pCont, pRaw, req.contLen); pMgmt->errCode = 0; + taosWLockLatch(&pMgmt->lock); pMgmt->transId = transId; + taosWUnLockLatch(&pMgmt->lock); mTrace("trans:%d, will be proposed", pMgmt->transId); const bool isWeak = false; int32_t code = syncPropose(pMgmt->sync, &req, isWeak); + if (code == 0) { tsem_wait(&pMgmt->syncSem); } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { @@ -286,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) { } void mndSyncStop(SMnode *pMnode) { + taosWLockLatch(&pMnode->syncMgmt.lock); if (pMnode->syncMgmt.transId != 0) { pMnode->syncMgmt.transId = 0; tsem_post(&pMnode->syncMgmt.syncSem); } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } bool mndIsMaster(SMnode *pMnode) { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 43bb92ec235371033ea82542493a2bbe7928d427..700c6cf8a31928e8a18fde5da33e737a4f9a87d6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -308,7 +308,8 @@ struct SVnode { SSink* pSink; tsem_t canCommit; int64_t sync; - int32_t blockCount; + SRWLatch lock; + bool blocked; bool restored; tsem_t syncSem; SQHandle* pQuery; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 1ba74ac3be965a25893229b01437461cb6df3454..4ee5c4760c5ac6d27c6f9990bf0fb6cb342bce49 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -85,7 +85,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.commitTerm = info.state.commitTerm; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; - pVnode->blockCount = 0; + taosInitRWLatch(&pVnode->lock); + pVnode->blocked = false; tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7ac124fdd3e08383afdc3a949b4e03fcaacae83a..50d32f5f5e1d084eb66b47145c488ed35e63c759 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -28,20 +28,28 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - pVnode->blockCount = 1; - tsem_wait(&pVnode->syncSem); + taosWLockLatch(&pVnode->lock); + if (!pVnode->blocked) { + vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + pVnode->blocked = true; + taosWUnLockLatch(&pVnode->lock); + tsem_wait(&pVnode->syncSem); + } else { + taosWUnLockLatch(&pVnode->lock); + } } } static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; - if (pVnode->blockCount) { + taosWLockLatch(&pVnode->lock); + if (pVnode->blocked) { vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - pVnode->blockCount = 0; + pVnode->blocked = false; tsem_post(&pVnode->syncSem); } + taosWUnLockLatch(&pVnode->lock); } } @@ -677,6 +685,12 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { vDebug("vgId:%d, become follower", pVnode->config.vgId); // clear old leader resource + taosWLockLatch(&pVnode->lock); + if (pVnode->blocked) { + pVnode->blocked = false; + tsem_post(&pVnode->syncSem); + } + taosWUnLockLatch(&pVnode->lock); } static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index f279d87815897c7b637219edf98c4bfe89a23724..64ca85edf45ac515bd7728883c171b04c399d148 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1105,6 +1105,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu SName* pName = ctgGetFetchName(ctx->pNames, pFetch); int32_t flag = pFetch->flag; int32_t* vgId = &pFetch->vgId; + bool taskDone = false; CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); @@ -1250,6 +1251,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu pOut->tbMeta = NULL; if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { TSWAP(pTask->res, ctx->pResList); + taskDone = true; } _return: @@ -1264,10 +1266,11 @@ _return: pRes->pRes = NULL; if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { TSWAP(pTask->res, ctx->pResList); + taskDone = true; } } - if (pTask->res) { + if (pTask->res && taskDone) { ctgHandleTaskEnd(pTask, code); } @@ -1354,6 +1357,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu SCatalog* pCtg = pTask->pJob->pCtg; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); + bool taskDone = false; CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); @@ -1377,6 +1381,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { TSWAP(pTask->res, ctx->pResList); + taskDone = true; } _return: @@ -1392,10 +1397,11 @@ _return: if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { TSWAP(pTask->res, ctx->pResList); + taskDone = true; } } - if (pTask->res) { + if (pTask->res && taskDone) { ctgHandleTaskEnd(pTask, code); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 930361419eadaeb7041b83c9cf5ae5c89e97e513..6935489ff4000aa37aed912769120713102ed4c1 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -581,6 +581,20 @@ _return: } int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) { + char *p = strchr(dbFName, '.'); + if (p) { + ++p; + } else { + p = dbFName; + } + + if (IS_SYS_DBNAME(p)) { + *inCache = true; + *pass = true; + ctgDebug("sysdb %s, pass", dbFName); + return TSDB_CODE_SUCCESS; + } + SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user)); if (NULL == pUser) { ctgDebug("user not in cache, user:%s", user); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 7b81343358364cf33b7f99f25faac2fa2599b621..e28234ab7603248e7261829bcb59e44ba24491fe 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -916,7 +916,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo int32_t vgNum = taosHashGetSize(dbInfo->vgHash); if (vgNum <= 0) { ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum); - CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } tableNameHashFp fp = NULL; @@ -931,6 +931,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo for (int32_t i = 0; i < tbNum; ++i) { vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo)); if (NULL == vgInfo) { + taosHashCancelIterate(dbInfo->vgHash, pIter); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -980,7 +981,6 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo if (NULL == p) { ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash)); - ASSERT(0); taosArrayDestroy(pVgList); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 977adcaa5e3872ff7aeea7209eca7e275ea07398..20396046ba5daa34c3caaf76796c2b8a3b06527c 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -101,12 +101,14 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn } static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { +/* uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery; if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) { qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, taosQueueItemSize(pDispatcher->pDataBlocks)); return false; } +*/ pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index be129cb6b488ca5625796ab65f1b6835c5cbe75d..12fed5a118d3fd0da28b1c60fdbaf71caace580f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1433,7 +1433,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u pAggInfo->groupId = groupId; } -static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) { +static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, + const int32_t* rowCellOffset) { bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset); @@ -1613,7 +1614,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG if (!pbInfo->mergeResultBlock) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); } else { - while(hasRemainResults(pGroupResInfo)) { + while (hasRemainResults(pGroupResInfo)) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; @@ -2062,10 +2063,10 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t } int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, - char** pNextStart) { + char** pNextStart) { if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); - *pNextStart = (char*) blockDecode(pRes, pData); + *pNextStart = (char*)blockDecode(pRes, pData); } else { // extract data according to pColList ASSERT(numOfOutput == taosArrayGetSize(pColList)); char* pStart = pData; @@ -2161,9 +2162,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; - int32_t index = 0; - char* pStart = pRetrieveRsp->data; - while(index++ < pRetrieveRsp->numOfBlocks) { + int32_t index = 0; + char* pStart = pRetrieveRsp->data; + while (index++ < pRetrieveRsp->numOfBlocks) { SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); code = extractDataBlockFromFetchRsp(pb, pStart, pRetrieveRsp->numOfCols, NULL, &pStart); if (code != 0) { @@ -2177,8 +2178,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d" - " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb," + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 + " execId:%d" + " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + ", total:%.2f Kb," " completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, @@ -2186,9 +2189,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn completed += 1; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 - ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, - pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize/1024.0); + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 + " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, + pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } taosMemoryFreeClear(pDataInfo->pRsp); @@ -3521,7 +3525,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -3562,7 +3566,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* return pOperator; _error: destroyAggOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -4180,7 +4183,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, - pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); + pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock, + pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; @@ -4195,7 +4199,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); + pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, + pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); @@ -4249,8 +4254,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ASSERT(0); } taosMemoryFree(ops); - - pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; + if (pOptr) pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; return pOptr; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6778e97d7ad63e14dfc19091d12767ecbfc140e5..481215b502ca21704873375e8a8471f393cb6d35 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -97,7 +97,8 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { pRowSup->groupId = groupId; } -static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) { +static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, + uint64_t groupId) { pRowSup->startRowIndex = rowIndex; pRowSup->numOfRows = 0; pRowSup->win.skey = tsList[rowIndex]; @@ -869,7 +870,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ } static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) { - return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);; + return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap); + ; } static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { @@ -910,9 +912,9 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) { return; - } + } int32_t delSize = taosArrayGetSize(pDelWins); - void* pIte = NULL; + void* pIte = NULL; while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { SResKeyPos* pResKey = (SResKeyPos*)pIte; int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); @@ -1592,9 +1594,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); - SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1874,7 +1876,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* _error: destroyIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -1931,7 +1932,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr _error: destroyIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -1965,7 +1965,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); } else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap || - (pRowSup->prevTs - tsList[j] >= 0 ) && (pRowSup->prevTs - tsList[j] <= gap)) { + (pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap)) { // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); if (j == 0 && pRowSup->startRowIndex != 0) { @@ -2118,7 +2118,7 @@ static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex, bool isLastRow) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - bool fillLastPoint = pSliceInfo->fillLastPoint; + bool fillLastPoint = pSliceInfo->fillLastPoint; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); @@ -2150,11 +2150,9 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo } pSliceInfo->fillLastPoint = isLastRow ? true : false; - } -static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, - SSDataBlock* pResBlock) { +static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) { int32_t rows = pResBlock->info.rows; // todo set the correct primary timestamp column @@ -2165,7 +2163,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t dstSlot = pExprInfo->base.resSchema.slotId; - //SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); + // SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); switch (pSliceInfo->fillType) { @@ -2181,15 +2179,15 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { float v = 0; GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char *)&v, false); + colDataAppend(pDst, rows, (char*)&v, false); } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { double v = 0; GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char *)&v, false); + colDataAppend(pDst, rows, (char*)&v, false); } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { int64_t v = 0; GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char *)&v, false); + colDataAppend(pDst, rows, (char*)&v, false); } pResBlock->info.rows += 1; break; @@ -2198,8 +2196,8 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp case TSDB_FILL_LINEAR: { SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot); - SPoint start = pLinearInfo->start; - SPoint end = pLinearInfo->end; + SPoint start = pLinearInfo->start; + SPoint end = pLinearInfo->end; SPoint current = {.key = pSliceInfo->current}; current.val = taosMemoryCalloc(pLinearInfo->bytes, 1); @@ -2212,7 +2210,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp colDataAppendNULL(pDst, rows); } else { taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type); - colDataAppend(pDst, rows, (char *)current.val, false); + colDataAppend(pDst, rows, (char*)current.val, false); } taosMemoryFree(current.val); @@ -2318,11 +2316,11 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB SFillLinearInfo linearInfo = {0}; linearInfo.start.key = INT64_MIN; - linearInfo.end.key = INT64_MAX; + linearInfo.end.key = INT64_MAX; linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.hasNull = false; - linearInfo.type = pColInfo->info.type; + linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); + linearInfo.hasNull = false; + linearInfo.type = pColInfo->info.type; linearInfo.bytes = pColInfo->info.bytes; taosArrayPush(pInfo->pLinearInfo, &linearInfo); } @@ -2400,7 +2398,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); - if (i == 0 && needToFillLastPoint(pSliceInfo)) { // first row in current block + if (i == 0 && needToFillLastPoint(pSliceInfo)) { // first row in current block doKeepLinearInfo(pSliceInfo, pBlock, i, false); while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); @@ -2446,8 +2444,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); if (pResBlock->info.rows >= pResBlock->info.capacity) { break; } @@ -2458,11 +2456,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { break; } } - } else {// it is the last row of current block - //store ts value as start, and calculate interp value when processing next block + } else { // it is the last row of current block + // store ts value as start, and calculate interp value when processing next block doKeepLinearInfo(pSliceInfo, pBlock, i, true); } - } else { // non-linear interpolation + } else { // non-linear interpolation pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (pSliceInfo->current > pSliceInfo->win.ekey) { @@ -2480,7 +2478,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { // no need to increate pSliceInfo->current here - //pSliceInfo->current = + // pSliceInfo->current = // taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (i < pBlock->info.rows - 1) { doKeepLinearInfo(pSliceInfo, pBlock, i, false); @@ -2488,8 +2486,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); if (pResBlock->info.rows >= pResBlock->info.capacity) { break; } @@ -2501,7 +2499,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } } - } else { // non-linear interpolation + } else { // non-linear interpolation if (i < pBlock->info.rows - 1) { // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate doKeepNextRows(pSliceInfo, pBlock, i + 1); @@ -2509,8 +2507,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); if (pResBlock->info.rows >= pResBlock->info.capacity) { break; } @@ -2557,7 +2555,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { pResBlock->info.rows += 1; doKeepPrevRows(pSliceInfo, pBlock, i); - if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); @@ -2567,8 +2564,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); if (pResBlock->info.rows >= pResBlock->info.capacity) { break; } @@ -2580,7 +2577,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } } - } else { // non-linear interpolation + } else { // non-linear interpolation pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); @@ -2596,12 +2593,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } } - } // check if need to interpolate after last datablock // except for fill(next), fill(linear) - while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { + while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && + pSliceInfo->fillType != TSDB_FILL_LINEAR) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); @@ -2825,7 +2822,6 @@ _error: destroySWindowOperatorInfo(pInfo, numOfCols); } - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3462,7 +3458,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, _error: destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3647,7 +3642,6 @@ _error: destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); } - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3765,8 +3759,8 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star return insertNewSessionWindow(pWinInfos, startTs, index + 1); } -int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,int32_t rows, - int32_t start, int64_t gap, SHashObj* pStDeleted) { +int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId, + int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted) { for (int32_t i = start; i < rows; ++i) { if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) { return i - start; @@ -3943,8 +3937,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } int32_t winIndex = 0; SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex); - winRows = - updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); + winRows = updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, + pStDeleted); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -4051,7 +4045,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It blockDataEnsureCapacity(pBlock, size); size_t keyLen = 0; while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { - SWinRes* res = *Ite; + SWinRes* res = *Ite; SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); @@ -4182,7 +4176,7 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { int32_t size = taosArrayGetSize(pResWins); for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); - SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; + SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); } } @@ -4479,7 +4473,6 @@ _error: destroyStreamSessionAggOperatorInfo(pInfo, pOperator->exprSupp.numOfExprs); } - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -4916,7 +4909,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys _error: destroyStreamStateOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -5026,7 +5018,6 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); } - static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -5101,7 +5092,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { blockDataCleanup(pRes); if (iaInfo->binfo.mergeResultBlock) { - while(1) { + while (1) { if (pOperator->status == OP_EXEC_DONE) { break; } @@ -5191,7 +5182,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, _error: destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols); - taosMemoryFreeClear(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -5496,7 +5486,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI _error: destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); - taosMemoryFreeClear(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 95267e5f584feeae120352b883d890bca100d1a3..b234ff97c9acdad3847081741a08144aa8df6c55 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2176,7 +2176,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "top", .type = FUNCTION_TYPE_TOP, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, @@ -2191,7 +2191,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, @@ -2590,7 +2590,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "sample", .type = FUNCTION_TYPE_SAMPLE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC, .translateFunc = translateSample, .getEnvFunc = getSampleFuncEnv, .initFunc = sampleFunctionSetup, diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 850ddd49700366e10265f661ffae698d9bdbc85b..be64a8b44d28a76a0b04a78b3940bcb0c86101da 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -62,12 +62,13 @@ static void indexDestroy(void* sIdx); void indexInit() { // refactor later - indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); + indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index", NULL); indexRefMgt = taosOpenRef(1000, indexDestroy); } void indexCleanup() { // refacto later taosCleanUpScheduler(indexQhandle); + taosMemoryFreeClear(indexQhandle); taosCloseRef(indexRefMgt); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ba97d47e19547883d60e12fd031ac8be723a3507..ef985a38944288e156fa370384272197421a8920 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2037,7 +2037,17 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, code = getDBVgInfoImpl(pCxt, pName, &vgroupList); } - if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) { + if (TSDB_CODE_SUCCESS == code && + 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) && + 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) && + isSelectStmt(pCxt->pCurrStmt) && + 0 == taosArrayGetSize(vgroupList)) { + ((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true; + } + + if (TSDB_CODE_SUCCESS == code && + 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) && + 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) { code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList); } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 41333e77566f26b34e3a37f7181c59e129fd4d0e..5143aa4af1f90ba0e7a0ac2f37af6648ed68c685 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -96,12 +96,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static void* pTaskQueue = NULL; +static SSchedQueue pTaskQueue = {0}; int32_t initTaskQueue() { int32_t queueSize = tsMaxShellConns * 2; - pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc"); - if (NULL == pTaskQueue) { + void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue); + if (NULL == p) { qError("failed to init task queue"); return -1; } @@ -111,7 +111,7 @@ int32_t initTaskQueue() { } int32_t cleanupTaskQueue() { - taosCleanUpScheduler(pTaskQueue); + taosCleanUpScheduler(&pTaskQueue); return 0; } @@ -134,7 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) schedMsg.thandle = execParam; schedMsg.msg = code; - taosScheduleTask(pTaskQueue, &schedMsg); + taosScheduleTask(&pTaskQueue, &schedMsg); return 0; } diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 07575336a1ce521b47970896eefed8808dcb26b8..5d17536874db144591f156de6a3e22adc52111e7 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -302,6 +302,7 @@ void *taosMemoryStrDup(const char *ptr) { } void taosMemoryFree(void *ptr) { + if (NULL == ptr) return; #ifdef USE_TD_MEMORY TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) { diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 28e15dc9a31e3ef68dcbaba01ee71c3c64f805d3..83cfbb2ad194e5613aa0fef74ee28ccf160efe5f 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -391,7 +391,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * assert(TAOS_LRU_ENTRY_IN_CACHE(old)); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); - if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { + if (!TAOS_LRU_ENTRY_HAS_REFS(old)) { taosLRUCacheShardLRURemove(shard, old); assert(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 9abce966f5f97fc1f5b050f3a9bd3e0d2e8a3519..89471c4347e49152b2b1fff0313ef29da92c70ff 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -22,30 +22,16 @@ #define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue. -typedef struct { - char label[TSDB_LABEL_LEN]; - tsem_t emptySem; - tsem_t fullSem; - TdThreadMutex queueMutex; - int32_t fullSlot; - int32_t emptySlot; - int32_t queueSize; - int32_t numOfThreads; - TdThread *qthread; - SSchedMsg *queue; - bool stop; - void *pTmrCtrl; - void *pTimer; -} SSchedQueue; - static void *taosProcessSchedQueue(void *param); static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); -void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label) { - SSchedQueue *pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1); - if (pSched == NULL) { - uError("%s: no enough memory for pSched", label); - return NULL; +void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) { + if (NULL == pSched) { + pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1); + if (pSched == NULL) { + uError("%s: no enough memory for pSched", label); + return NULL; + } } pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize); @@ -86,7 +72,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab return NULL; } - pSched->stop = false; + atomic_store_8(&pSched->stop, 0); for (int32_t i = 0; i < numOfThreads; ++i) { TdThreadAttr attr; taosThreadAttrInit(&attr); @@ -107,7 +93,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab } void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) { - SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label); + SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label, NULL); if (tmrCtrl != NULL && pSched != NULL) { pSched->pTmrCtrl = tmrCtrl; @@ -131,7 +117,7 @@ void *taosProcessSchedQueue(void *scheduler) { uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); ASSERT(0); } - if (pSched->stop) { + if (atomic_load_8(&pSched->stop)) { break; } @@ -172,6 +158,11 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { return; } + if (atomic_load_8(&pSched->stop)) { + uError("sched is already stopped, msg:%p is dropped", pMsg); + return; + } + if ((ret = tsem_wait(&pSched->emptySem)) != 0) { uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); ASSERT(0); @@ -202,7 +193,10 @@ void taosCleanUpScheduler(void *param) { uDebug("start to cleanup %s schedQsueue", pSched->label); - pSched->stop = true; + atomic_store_8(&pSched->stop, 1); + + taosMsleep(200); + for (int32_t i = 0; i < pSched->numOfThreads; ++i) { if (taosCheckPthreadValid(pSched->qthread[i])) { tsem_post(&pSched->fullSem); @@ -226,7 +220,7 @@ void taosCleanUpScheduler(void *param) { if (pSched->queue) taosMemoryFree(pSched->queue); if (pSched->qthread) taosMemoryFree(pSched->qthread); - taosMemoryFree(pSched); // fix memory leak + //taosMemoryFree(pSched); } // for debug purpose, dump the scheduler status every 1min. diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 7256331c85aa21f4f05e90aa6560fbcdb11b1a81..3a868c7f979b19e4c61bcb82985055e05569d486 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -555,7 +555,7 @@ static void taosTmrModuleInit(void) { return; } - tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr"); + tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL); taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK); tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads); @@ -606,6 +606,7 @@ void taosTmrCleanUp(void* handle) { taosUninitTimer(); taosCleanUpScheduler(tmrQhandle); + taosMemoryFreeClear(tmrQhandle); for (int32_t i = 0; i < tListLen(wheels); i++) { time_wheel_t* wheel = wheels + i; diff --git a/tests/script/tsim/show/basic.sim b/tests/script/tsim/show/basic.sim index 162e74ea141bd59ea2ad6c9a856ecaa589698413..274476e17c44fb7562fef7d92134b30ec4289263 100644 --- a/tests/script/tsim/show/basic.sim +++ b/tests/script/tsim/show/basic.sim @@ -210,6 +210,8 @@ if $rows != 3 then return -1 endi +sql_error select * from performance_schema.PERF_OFFSETS; + sql show create stable stb; if $rows != 1 then return -1 diff --git a/tests/script/tsim/user/privilege_sysinfo.sim b/tests/script/tsim/user/privilege_sysinfo.sim index 718083f0d959dfae43a1c583e0039961bfd0d98e..25c1a84db699a8cdef5678abaf728f4a93690bde 100644 --- a/tests/script/tsim/user/privilege_sysinfo.sim +++ b/tests/script/tsim/user/privilege_sysinfo.sim @@ -45,19 +45,19 @@ sql_error drop database db sql_error use db sql_error alter database db replica 1; sql_error show db.vgroups -sql_error select * from information_schema.ins_stables where db_name = 'db' -sql_error select * from information_schema.ins_tables where db_name = 'db' +sql select * from information_schema.ins_stables where db_name = 'db' +sql select * from information_schema.ins_tables where db_name = 'db' print =============== check show -sql_error select * from information_schema.ins_users +sql select * from information_schema.ins_users sql_error show cluster -sql_error select * from information_schema.ins_dnodes -sql_error select * from information_schema.ins_mnodes +sql select * from information_schema.ins_dnodes +sql select * from information_schema.ins_mnodes sql_error show snodes -sql_error select * from information_schema.ins_qnodes +sql select * from information_schema.ins_qnodes sql_error show bnodes sql_error show grants sql_error show dnode 1 variables; sql show variables; -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/2-query/sample.py b/tests/system-test/2-query/sample.py index 34615c051537b4478777acf351383236be836d43..46d2062341e67be45b5cedd72564c8be8ef04a71 100644 --- a/tests/system-test/2-query/sample.py +++ b/tests/system-test/2-query/sample.py @@ -515,7 +515,7 @@ class TDTestCase: # "condition": "where ts>0 and ts < now interval(1h) fill(next)" # } # self.checksample(**err45) # interval - tdSql.query("select sample( c1 , 1 ) from t1 where ts>0 and ts < now interval(1h) fill(next)") + tdSql.error("select sample( c1 , 1 ) from t1 where ts>0 and ts < now interval(1h) fill(next)") err46 = { "table_expr": "t1", "condition": "group by c6"