提交 bc91b0ab 编写于 作者: G gccgdb1234

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -38,40 +38,21 @@ def pre_test(){
sh '''
cd ${WK}
git reset --hard
git remote prune origin
git fetch
cd ${WKC}
git reset --hard
git clean -fxd
git remote prune origin
git fetch
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
cd ${WK}
git checkout master
cd ${WKC}
git checkout master
} else if(env.CHANGE_TARGET == '2.0') {
sh '''
cd ${WK}
git checkout 2.0
cd ${WKC}
git checkout 2.0
} else if(env.CHANGE_TARGET == '3.0') {
sh '''
cd ${WK}
git checkout 3.0
cd ${WKC}
git checkout 3.0
} else {
sh '''
cd ${WK}
git checkout develop
cd ${WKC}
git checkout develop
sh '''
cd ${WK}
git checkout ''' + env.CHANGE_TARGET + '''
cd ${WKC}
git checkout ''' + env.CHANGE_TARGET + '''
if (env.CHANGE_URL =~ /\/TDengine\//) {
sh '''
......@@ -169,49 +150,24 @@ def pre_test_win(){
bat '''
git reset --hard
git remote prune origin
git fetch
bat '''
git reset --hard
git remote prune origin
git fetch
script {
if (env.CHANGE_TARGET == 'master') {
bat '''
git checkout master
bat '''
git checkout master
} else if(env.CHANGE_TARGET == '2.0') {
bat '''
git checkout 2.0
bat '''
git checkout 2.0
} else if(env.CHANGE_TARGET == '3.0') {
bat '''
git checkout 3.0
bat '''
git checkout 3.0
} else {
bat '''
git checkout develop
bat '''
git checkout develop
bat '''
git checkout ''' + env.CHANGE_TARGET + '''
bat '''
git checkout ''' + env.CHANGE_TARGET + '''
script {
if (env.CHANGE_URL =~ /\/TDengine\//) {
......@@ -309,7 +265,6 @@ def pre_test_build_win() {
bat '''
python.exe -m pip install --upgrade pip
python -m pip install .
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
......@@ -328,7 +283,6 @@ def run_win_test() {
bat '''
echo "windows test ..."
python.exe -m pip install --upgrade pip
python -m pip install .
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
ls -l C:\\Windows\\System32\\taos.dll
......@@ -188,7 +188,7 @@ apt install autoconf
cmake .. -DJEMALLOC_ENABLED=true
在 X86-64、X86、arm64、arm32 和 mips64 平台上,TDengine 生成脚本可以自动检测机器架构。也可以手动配置 CPUTYPE 参数来指定 CPU 类型,如 aarch64 或 aarch32 等。
在 X86-64、X86、arm64 平台上,TDengine 生成脚本可以自动检测机器架构。也可以手动配置 CPUTYPE 参数来指定 CPU 类型,如 aarch64 等。
......@@ -196,18 +196,6 @@ aarch64:
cmake .. -DCPUTYPE=aarch64 && cmake --build .
cmake .. -DCPUTYPE=aarch32 && cmake --build .
cmake .. -DCPUTYPE=mips64 && cmake --build .
### Windows 系统
如果你使用的是 Visual Studio 2013 版本:
......@@ -205,8 +205,8 @@ apt install autoconf
cmake .. -DJEMALLOC_ENABLED=true
TDengine build script can detect the host machine's architecture on X86-64, X86, arm64, arm32 and mips64 platform.
You can also specify CPUTYPE option like aarch64 or aarch32 too if the detection result is not correct:
TDengine build script can detect the host machine's architecture on X86-64, X86, arm64 platform.
You can also specify CPUTYPE option like aarch64 too if the detection result is not correct:
......@@ -214,18 +214,6 @@ aarch64:
cmake .. -DCPUTYPE=aarch64 && cmake --build .
cmake .. -DCPUTYPE=aarch32 && cmake --build .
cmake .. -DCPUTYPE=mips64 && cmake --build .
### On Windows platform
If you use the Visual Studio 2013, please open a command window by executing "cmd.exe".
......@@ -22,7 +22,9 @@ ELSEIF (TD_WINDOWS)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.38-dist.jar DESTINATION connector/jdbc)
......@@ -17,7 +17,6 @@ Currently, TDengine's native interface connectors can support platforms such as
| **X86 64bit** | **Win32** | ● | ● | ● | ● | ○ | ○ | ● |
| **X86 32bit** | **Win32** | ○ | ○ | ○ | ○ | ○ | ○ | ● |
| **ARM64** | **Linux** | ● | ● | ● | ● | ○ | ○ | ● |
| **ARM32** | **Linux** | ● | ● | ● | ● | ○ | ○ | ● |
| **MIPS** | **Linux** | ○ | ○ | ○ | ○ | ○ | ○ | ○ |
Where ● means the official test verification passed, ○ means the unofficial test verification passed, -- means no assurance.
......@@ -19,15 +19,15 @@ TDengine's connector can support a wide range of platforms, including X64/X86/AR
The comparison matrix is as follows.
| **CPU** | **X64 64bit** | | | **X86 32bit** | **ARM64** | **ARM32** | **MIPS** | **Alpha** |
| ----------- | ------------- | --------- | --------- | ------------- | --------- | --------- | --------- | --------- |
| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** | **Linux** |
| **C/C++** | ● | ● | ● | ○ | ● | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● | ● |
| **Python** | ● | ● | ● | ○ | ● | ● | ● | -- |
| **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- |
| **C#** | ● | ● | ○ | ○ | ○ | ○ | ○ | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● |
| **CPU** | **X64 64bit** | | | **X86 32bit** | **ARM64** | **MIPS** | **Alpha** |
| ----------- | ------------- | --------- | --------- | ------------- | --------- | --------- | --------- |
| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** |
| **C/C++** | ● | ● | ● | ○ | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● |
| **Python** | ● | ● | ● | ○ | ● | ● | -- |
| **Go** | ● | ● | ● | ○ | ● | ○ | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ○ | -- |
| **C#** | ● | ● | ○ | ○ | ○ | ○ | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● |
Note: ● means the official test is verified, ○ means the unofficial test is verified, -- means not verified.
......@@ -4,7 +4,7 @@ sidebar_label: 文档首页
slug: /
TDengine是一款开源、[高性能](https://www.taosdata.com/fast)、云原生的时序数据库(Time-Series Database, TSDB), 它专为物联网、工业互联网、金融等场景优化设计。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。
TDengine是一款[开源](https://www.taosdata.com/tdengine/open_source_time-series_database)[高性能](https://www.taosdata.com/fast)[云原生](https://www.taosdata.com/tdengine/cloud_native_time-series_database)的时序数据库(Time-Series Database, TSDB), 它专为物联网、工业互联网、金融等场景优化设计。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。
TDengine 充分利用了时序数据的特点,提出了“一个数据采集点一张表”与“超级表”的概念,设计了创新的存储引擎,让数据的写入、查询和存储效率都得到极大的提升。为正确理解并使用TDengine, 无论如何,请您仔细阅读[基本概念](./concept)一章。
......@@ -3,7 +3,7 @@ title: 产品简介
toc_max_heading_level: 2
TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供[缓存](/develop/cache/)[数据订阅](/develop/subscribe)[流式计算](/develop/continuous-query)等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。
TDengine 是一款[开源](https://www.taosdata.com/tdengine/open_source_time-series_database)[高性能](https://www.taosdata.com/tdengine/fast)[云原生](https://www.taosdata.com/tdengine/cloud_native_time-series_database)的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供[缓存](/develop/cache/)[数据订阅](/develop/subscribe)[流式计算](/develop/continuous-query)等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。
......@@ -33,17 +33,17 @@ TDengine的主要功能如下:
由于 TDengine 充分利用了[时序数据特点](https://www.taosdata.com/blog/2019/07/09/105.html),比如结构化、无需事务、很少删除或更新、写多读少等等,设计了全新的针对时序数据的存储引擎和计算引擎,因此与其他时序数据库相比,TDengine 有以下特点:
- **高性能**:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的1/10。
- **[高性能](https://www.taosdata.com/tdengine/fast)**:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的1/10。
- **云原生**:通过原生分布式的设计,充分利用云平台的优势,TDengine 提供了水平扩展能力,具备弹性、韧性和可观测性,支持k8s部署,可运行在公有云、私有云和混合云上。
- **[云原生](https://www.taosdata.com/tdengine/cloud_native_time-series_database)**:通过原生分布式的设计,充分利用云平台的优势,TDengine 提供了水平扩展能力,具备弹性、韧性和可观测性,支持k8s部署,可运行在公有云、私有云和混合云上。
- **极简时序数据平台**:TDengine 内建消息队列、缓存、流式计算等功能,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降低系统的复杂度,降低应用开发和运营成本。
- **[极简时序数据平台](https://www.taosdata.com/tdengine/simplified_solution_for_time-series_data_processing)**:TDengine 内建消息队列、缓存、流式计算等功能,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降低系统的复杂度,降低应用开发和运营成本。
- **分析能力**:支持 SQL,同时为时序数据特有的分析提供SQL扩展。通过超级表、存储计算分离、分区分片、预计算、自定义函数等技术,TDengine 具备强大的分析能力。
- **[分析能力](https://www.taosdata.com/tdengine/easy_data_analytics)**:支持 SQL,同时为时序数据特有的分析提供SQL扩展。通过超级表、存储计算分离、分区分片、预计算、自定义函数等技术,TDengine 具备强大的分析能力。
- **简单易用**:无任何依赖,安装、集群几秒搞定;提供REST以及各种语言连接器,与众多第三方工具无缝集成;提供命令行程序,便于管理和即席查询;提供各种运维工具。
- **[简单易用](https://www.taosdata.com/tdengine/ease_of_use)**:无任何依赖,安装、集群几秒搞定;提供REST以及各种语言连接器,与众多第三方工具无缝集成;提供命令行程序,便于管理和即席查询;提供各种运维工具。
- **核心开源**:TDengine 的核心代码包括集群功能全部开源,截止到2022年8月1日,全球超过 135.9k 个运行实例,GitHub Star 18.7k,Fork 4.4k,社区活跃。
- **[核心开源](https://www.taosdata.com/tdengine/open_source_time-series_database)**:TDengine 的核心代码包括集群功能全部开源,截止到2022年8月1日,全球超过 135.9k 个运行实例,GitHub Star 18.7k,Fork 4.4k,社区活跃。
采用 TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。表现在几个方面:
......@@ -11,7 +11,7 @@ import TabItem from "@theme/TabItem";
TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包也支持通过 `apt-get` 工具从线上进行安装。
TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包也支持通过 `apt-get` 工具从线上进行安装。
## 安装
......@@ -3,7 +3,7 @@ sidebar_label: 统计数据
title: 存储统计数据的 Performance_Schema 数据库
TDengine 3.0 版本开始提供一个内置数据库 `performance_schema`,其中存储了与性能有关的统计数据。本节详细介绍其中的表和详细的表结构。
TDengine 3.0 版本开始提供一个内置数据库 `performance_schema`,其中存储了与性能有关的统计数据。本节详细介绍其中的表和表结构。
......@@ -17,7 +17,6 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
| **X86 64bit** | **Win32** | ● | ● | ● | ● | ○ | ○ | ● |
| **X86 32bit** | **Win32** | ○ | ○ | ○ | ○ | ○ | ○ | ● |
| **ARM64** | **Linux** | ● | ● | ● | ● | ○ | ○ | ● |
| **ARM32** | **Linux** | ○ | ○ | ○ | ○ | ○ | ○ | ● |
| **MIPS 龙芯** | **Linux** | ○ | ○ | ○ | ○ | ○ | ○ | ○ |
| **Alpha 申威** | **Linux** | ○ | ○ | -- | -- | -- | -- | ○ |
| **X86 海光** | **Linux** | ○ | ○ | ○ | -- | -- | -- | ○ |
......@@ -19,15 +19,15 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
| **CPU** | **X64 64bit** | | | **X86 32bit** | **ARM64** | **ARM32** | **MIPS 龙芯** | **Alpha 申威** | **X64 海光** |
| ----------- | ------------- | --------- | --------- | ------------- | --------- | --------- | ------------- | -------------- | ------------ |
| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** | **Linux** | **Linux** |
| **C/C++** | ● | ● | ● | ○ | ● | ● | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● | ● | ● |
| **Python** | ● | ● | ● | ○ | ● | ● | ● | -- | ● |
| **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- |
| **C#** | ● | ● | ○ | ○ | ○ | ○ | ○ | -- | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● | ● |
| **CPU** | **X64 64bit** | | | **X86 32bit** | **ARM64** | **MIPS 龙芯** | **Alpha 申威** | **X64 海光** |
| ----------- | ------------- | --------- | --------- | ------------- | --------- | ------------- | -------------- | ------------ |
| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** | **Linux** |
| **C/C++** | ● | ● | ● | ○ | ● | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● | ● |
| **Python** | ● | ● | ● | ○ | ● | ● | -- | ● |
| **Go** | ● | ● | ● | ○ | ● | ○ | -- | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ○ | -- | -- |
| **C#** | ● | ● | ○ | ○ | ○ | ○ | -- | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● |
注:● 表示官方测试验证通过,○ 表示非官方测试验证通过,-- 表示未经验证。
......@@ -103,12 +103,12 @@ typedef struct SDataBlockInfo {
int16_t hasVarCol;
uint32_t capacity;
// TODO: optimize and remove following
int64_t version; // used for stream, and need serialization
int64_t ts; // used for stream, and need serialization
int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize
TSKEY watermark;// used for stream
int64_t version; // used for stream, and need serialization
int64_t ts; // used for stream, and need serialization
int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize
TSKEY watermark; // used for stream
} SDataBlockInfo;
typedef struct SSDataBlock {
......@@ -268,6 +268,15 @@ typedef struct SSortExecInfo {
int32_t readBytes; // read io bytes
} SSortExecInfo;
// stream special block column
#ifdef __cplusplus
......@@ -34,6 +34,8 @@ typedef struct SStreamTask SStreamTask;
enum {
......@@ -194,6 +194,9 @@ function install_bin() {
${csudo}rm -f ${bin_link_dir}/${serverName} || :
${csudo}rm -f ${bin_link_dir}/${adapterName} || :
${csudo}rm -f ${bin_link_dir}/${uninstallScript} || :
${csudo}rm -f ${bin_link_dir}/${demoName} || :
${csudo}rm -f ${bin_link_dir}/${benchmarkName} || :
${csudo}rm -f ${bin_link_dir}/${dumpName} || :
${csudo}rm -f ${bin_link_dir}/set_core || :
${csudo}rm -f ${bin_link_dir}/TDinsight.sh || :
......@@ -205,7 +208,6 @@ function install_bin() {
[ -x ${install_main_dir}/bin/${adapterName} ] && ${csudo}ln -s ${install_main_dir}/bin/${adapterName} ${bin_link_dir}/${adapterName} || :
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || :
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || :
[ -x ${install_main_dir}/bin/${tmqName} ] && ${csudo}ln -s ${install_main_dir}/bin/${tmqName} ${bin_link_dir}/${tmqName} || :
[ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || :
[ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
......@@ -964,12 +966,17 @@ function installProduct() {
## ==============================Main program starts from here============================
if [ "$verType" == "server" ]; then
# Install server and client
if [ -x ${bin_dir}/${serverName} ]; then
# Check default 2.x data file.
if [ -x ${data_dir}/dnode/dnodeCfg.json ]; then
echo -e "\033[44;31;5mThe default data directory ${data_dir} contains old data of tdengine 2.x, please clear it before installing!\033[0m"
# Install server and client
if [ -x ${bin_dir}/${serverName} ]; then
elif [ "$verType" == "client" ]; then
......@@ -135,12 +135,12 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
static const SSysDbTableSchema userTblsSchema[] = {
......@@ -53,14 +53,27 @@ static bool dmCheckDiskSpace() {
if (!osDataSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least , quit", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
if (!osLogSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
if (!osTempSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
return true;
static bool dmCheckDataDirVersion() {
char checkDataDirJsonFileName[PATH_MAX];
snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
if (taosCheckExistFile(checkDataDirJsonFileName)) {
dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!", tsDataDir);
return false;
return true;
......@@ -68,6 +81,7 @@ static bool dmCheckDiskSpace() {
int32_t dmInit(int8_t rtype) {
dInfo("start to init dnode env");
if (!dmCheckDataDirVersion()) return -1;
if (!dmCheckDiskSpace()) return -1;
if (dmCheckRepeatInit(dmInstance()) != 0) return -1;
if (dmInitSystem() != 0) return -1;
......@@ -197,6 +197,30 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
sdbRelease(pSdb, pStream);
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
int8_t status = atomic_load_8(&pStream->status);
if (status == STREAM_STATUS__NORMAL) {
strcpy(dst, "normal");
} else if (status == STREAM_STATUS__STOP) {
strcpy(dst, "stop");
} else if (status == STREAM_STATUS__FAILED) {
strcpy(dst, "failed");
} else if (status == STREAM_STATUS__RECOVER) {
strcpy(dst, "recover");
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->trigger;
if (trigger == STREAM_TRIGGER_AT_ONCE) {
strcpy(dst, "at once");
} else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
strcpy(dst, "window close");
} else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
strcpy(dst, "max delay");
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
pCreate->targetStbFullName[0] == 0) {
......@@ -926,8 +950,11 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
char status[20 + VARSTR_HEADER_SIZE] = {0};
mndShowStreamStatus(&status[VARSTR_HEADER_SIZE], pStream);
varDataSetLen(status, strlen(varDataVal(status)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true);
colDataAppend(pColInfo, numOfRows, (const char *)&status, false);
tNameFromString(&n, pStream->sourceDb, T_NAME_ACCT | T_NAME_DB);
......@@ -958,8 +985,11 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false);
char trigger[20 + VARSTR_HEADER_SIZE] = {0};
mndShowStreamTrigger(&trigger[VARSTR_HEADER_SIZE], pStream);
varDataSetLen(trigger, strlen(varDataVal(trigger)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false);
colDataAppend(pColInfo, numOfRows, (const char *)&trigger, false);
sdbRelease(pSdb, pStream);
......@@ -171,7 +171,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
// sma
......@@ -201,8 +201,9 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
SSubmitReq *pSubmitReq =
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
......@@ -13,10 +13,44 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "tcommon.h"
#include "tmsg.h"
#include "tq.h"
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId, SBatchDeleteReq* deleteReq) {
int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
SBatchDeleteReq* deleteReq) {
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
int32_t totRow = pDataBlock->info.rows;
SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/
int64_t groupId = 0;
char* name = buildCtbNameByGroupId(stbFullName, groupId);
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, name) < 0) {
return -1;
int64_t uid = mr.me.uid;
SSingleDeleteReq req = {
.ts = ts,
.uid = uid,
taosArrayPush(deleteReq->deleteReqs, &req);
return 0;
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL;
......@@ -33,9 +67,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_DATA) {
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
int32_t padding1 = 0;
void* padding2 = taosMemoryMalloc(1);
taosArrayPush(schemaReqSz, &padding1);
taosArrayPush(schemaReqs, &padding2);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
......@@ -97,7 +135,10 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t cap = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
int32_t rows = pDataBlock->info.rows;
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
......@@ -119,6 +160,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
tdBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
......@@ -188,7 +234,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
......@@ -201,12 +247,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
SEncoder encoder;
void* buf = taosMemoryCalloc(1, len + sizeof(SMsgHead));
void* buf = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
((SMsgHead*)buf)->vgId = pVnode->config.vgId;
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
SRpcMsg msg = {
......@@ -145,7 +145,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
int32_t len;
int32_t ret;
vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
pVnode->state.applied = version;
......@@ -1071,6 +1071,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
return 0;
......@@ -883,6 +883,32 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
int32_t ctgHashValueComp(void const *lp, void const *rp) {
uint32_t *key = (uint32_t *)lp;
SVgroupInfo *pVg = *(SVgroupInfo **)rp;
if (*key < pVg->hashBegin) {
return -1;
} else if (*key > pVg->hashEnd) {
return 1;
return 0;
int ctgVgInfoComp(const void* lp, const void* rp) {
SVgroupInfo *pLeft = *(SVgroupInfo **)lp;
SVgroupInfo *pRight = *(SVgroupInfo **)rp;
if (pLeft->hashBegin < pRight->hashBegin) {
return -1;
} else if (pLeft->hashBegin > pRight->hashBegin) {
return 1;
return 0;
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) {
int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
......@@ -923,9 +949,19 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
taosHashCancelIterate(dbInfo->vgHash, pIter);
SArray* pVgList = taosArrayInit(vgNum, POINTER_BYTES);
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
taosArrayPush(pVgList, &pIter);
pIter = taosHashIterate(dbInfo->vgHash, pIter);
taosArraySort(pVgList, ctgVgInfoComp);
char tbFullName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFullName, "%s.", dbFName);
int32_t offset = strlen(tbFullName);
......@@ -940,25 +976,20 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen);
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
taosHashCancelIterate(dbInfo->vgHash, pIter);
pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL;
SVgroupInfo **p = taosArraySearch(pVgList, &hashValue, ctgHashValueComp, TD_EQ);
if (NULL == vgInfo) {
if (NULL == p) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
vgInfo = *p;
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == pNewVg) {
......@@ -977,6 +1008,8 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
......@@ -52,13 +52,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
enum {
// when this task starts to execute, this status will set
......@@ -701,6 +694,7 @@ typedef struct SSessionAggOperatorInfo {
typedef struct SResultWindowInfo {
SResultRowPosition pos;
STimeWindow win;
uint64_t groupId;
bool isOutput;
bool isClosed;
} SResultWindowInfo;
......@@ -1015,8 +1009,6 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
......@@ -1949,6 +1949,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
if (pExchangeInfo == NULL) {
qWarn("failed to acquire exchange operator, since it may have been released");
......@@ -1969,6 +1970,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
} else {
pSourceDataInfo->code = code;
qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
......@@ -3734,7 +3734,7 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star
return insertNewSessionWindow(pWinInfos, startTs, index + 1);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows,
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted) {
for (int32_t i = start; i < rows; ++i) {
if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
......@@ -3742,7 +3742,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS
if (pWinInfo->win.skey > pStartTs[i]) {
if (pStDeleted && pWinInfo->isOutput) {
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId};
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
pWinInfo->isOutput = false;
pWinInfo->win.skey = pStartTs[i];
......@@ -3861,7 +3862,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo);
taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition));
if (pWinInfo->isOutput) {
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId};
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
pWinInfo->isOutput = false;
taosArrayRemove(pInfo->streamAggSup.pCurWins, i);
......@@ -3911,7 +3913,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex);
winRows =
updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -3960,6 +3962,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
deleteWindow(pAggSup->pCurWins, winIndex, fp);
if (result) {
pCurWin->groupId = gpDatas[i];
taosArrayPush(result, pCurWin);
......@@ -3980,7 +3983,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
step = 1;
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, 0, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap));
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
if (result) {
......@@ -4017,12 +4020,11 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
blockDataEnsureCapacity(pBlock, size);
size_t keyLen = 0;
while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pColInfoData, pBlock->info.rows, *Ite, false);
for (int32_t i = 1; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
colDataAppendNULL(pColInfoData, pBlock->info.rows);
SWinRes* res = *Ite;
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
colDataAppend(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false);
pBlock->info.rows += 1;
if (pBlock->info.rows + 1 >= pBlock->info.capacity) {
......@@ -4149,7 +4151,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) {
int32_t size = taosArrayGetSize(pResWins);
for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i);
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId};
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
......@@ -37,17 +37,6 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock
int32_t maxRows = 0;
for (int32_t j = 0; j < num; ++j) {
#if 0
int32_t id = pCtx[j].functionId;
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
if (id == FUNCTION_TS || id == FUNCTION_TAG || id == FUNCTION_TAGPRJ) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
if (pResInfo != NULL && maxRows < pResInfo->numOfRes) {
maxRows = pResInfo->numOfRes;
......@@ -237,8 +237,8 @@
./test.sh -f tsim/stream/distributeInterval0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeSession0.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
#./test.sh -f tsim/stream/session0.sim
#./test.sh -f tsim/stream/session1.sim
./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/triggerInterval0.sim
./test.sh -f tsim/stream/triggerSession0.sim
......@@ -30,7 +30,7 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
def diff_query_form(self, col="c1", alias="", table_expr="t1", condition=""):
def diff_query_form(self, col="c1", alias="", table_expr="db.t1", condition=""):
diff function:
......@@ -44,7 +44,7 @@ class TDTestCase:
return f"select diff({col}) {alias} from {table_expr} {condition}"
def checkdiff(self,col="c1", alias="", table_expr="t1", condition="" ):
def checkdiff(self,col="c1", alias="", table_expr="db.t1", condition="" ):
line = sys._getframe().f_back.f_lineno
pre_sql = self.diff_query_form(
col=col, table_expr=table_expr, condition=condition
......@@ -60,7 +60,7 @@ class TDTestCase:
if "order by tbname" in condition:
col=col, alias=alias, table_expr=table_expr, condition=condition
......@@ -164,9 +164,9 @@ class TDTestCase:
# case7~8: nested query
# case7 = {"table_expr": "(select c1 from stb1)"}
# case7 = {"table_expr": "(select c1 from db.stb1)"}
# self.checkdiff(**case7)
# case8 = {"table_expr": "(select diff(c1) c1 from stb1 group by tbname)"}
# case8 = {"table_expr": "(select diff(c1) c1 from db.stb1 group by tbname)"}
# self.checkdiff(**case8)
# case9~10: mix with tbname/ts/tag/col
......@@ -200,15 +200,15 @@ class TDTestCase:
# case18~19: with group by
# case18 = {
# "table_expr": "t1",
# "table_expr": "db.t1",
# "condition": "group by c6"
# }
# self.checkdiff(**case18)
# case19 = {
# "table_expr": "stb1",
# "condition": "partition by tbname" # partition by tbname
# }
# self.checkdiff(**case19)
case19 = {
"table_expr": "db.stb1",
"condition": "partition by tbname order by tbname" # partition by tbname
# # case20~21: with order by
# case20 = {"condition": "order by ts"}
......@@ -226,7 +226,7 @@ class TDTestCase:
# case24 = {
# "table_expr": "stb1",
# "table_expr": "db.stb1",
# "condition": "group by tbname slimit 1 soffset 1"
# }
# self.checkdiff(**case24)
......@@ -241,13 +241,13 @@ class TDTestCase:
# form test
tdSql.error(self.diff_query_form(col="")) # no col
tdSql.error("diff(c1) from stb1") # no select
tdSql.error("select diff from t1") # no diff condition
tdSql.error("select diff c1 from t1") # no brackets
tdSql.error("select diff(c1) t1") # no from
tdSql.error("diff(c1) from db.stb1") # no select
tdSql.error("select diff from db.t1") # no diff condition
tdSql.error("select diff c1 from db.t1") # no brackets
tdSql.error("select diff(c1) db.t1") # no from
tdSql.error("select diff( c1 ) from ") # no table_expr
# tdSql.error(self.diff_query_form(col="st1")) # tag col
tdSql.query("select diff(st1) from t1 ")
tdSql.query("select diff(st1) from db.t1")
# tdSql.error(self.diff_query_form(col=1)) # col is a value
tdSql.error(self.diff_query_form(col="'c1'")) # col is a string
tdSql.error(self.diff_query_form(col=None)) # col is NULL 1
......@@ -258,18 +258,18 @@ class TDTestCase:
tdSql.error(self.diff_query_form(col='c.')) # col is spercial char 3
tdSql.error(self.diff_query_form(col='avg(c1)')) # expr col
# tdSql.error(self.diff_query_form(col='c6')) # bool col
tdSql.query("select diff(c6) from t1")
tdSql.query("select diff(c6) from db.t1")
tdSql.error(self.diff_query_form(col='c4')) # binary col
tdSql.error(self.diff_query_form(col='c10')) # nachr col
tdSql.error(self.diff_query_form(col='c10')) # not table_expr col
tdSql.error(self.diff_query_form(col='t1')) # tbname
tdSql.error(self.diff_query_form(col='stb1')) # stbname
tdSql.error(self.diff_query_form(col='db.t1')) # tbname
tdSql.error(self.diff_query_form(col='db.stb1')) # stbname
tdSql.error(self.diff_query_form(col='db')) # datbasename
# tdSql.error(self.diff_query_form(col=True)) # col is BOOL 1
# tdSql.error(self.diff_query_form(col='True')) # col is BOOL 2
tdSql.error(self.diff_query_form(col='*')) # col is all col
tdSql.error("select diff[c1] from t1") # sql form error 1
tdSql.error("select diff{c1} from t1") # sql form error 2
tdSql.error("select diff[c1] from db.t1") # sql form error 1
tdSql.error("select diff{c1} from db.t1") # sql form error 2
tdSql.error(self.diff_query_form(col="[c1]")) # sql form error 3
# tdSql.error(self.diff_query_form(col="c1, c2")) # sql form error 3
# tdSql.error(self.diff_query_form(col="c1, 2")) # sql form error 3
......@@ -282,7 +282,7 @@ class TDTestCase:
# tdSql.error(self.diff_query_form(alias=" + 2")) # mix with arithmetic 1
tdSql.error(self.diff_query_form(alias=" + avg(c1)")) # mix with arithmetic 2
tdSql.query(self.diff_query_form(alias=", c2")) # mix with other 1
# tdSql.error(self.diff_query_form(table_expr="stb1")) # select stb directly
# tdSql.error(self.diff_query_form(table_expr="db.stb1")) # select stb directly
stb_join = {
"col": "stb1.c1",
"table_expr": "stb1, stb2",
......@@ -294,17 +294,17 @@ class TDTestCase:
tdSql.error(self.diff_query_form(**interval_sql)) # interval
group_normal_col = {
"table_expr": "t1",
"table_expr": "db.t1",
"condition": "group by c6"
tdSql.error(self.diff_query_form(**group_normal_col)) # group by normal col
slimit_soffset_sql = {
"table_expr": "stb1",
"table_expr": "db.stb1",
"condition": "group by tbname slimit 1 soffset 1"
# tdSql.error(self.diff_query_form(**slimit_soffset_sql))
order_by_tbname_sql = {
"table_expr": "stb1",
"table_expr": "db.stb1",
"condition": "group by tbname order by tbname"
......@@ -349,63 +349,40 @@ class TDTestCase:
"create stable db.stb2 (ts timestamp, c1 int) tags(st2 int)"
for i in range(tbnum):
tdSql.execute(f"create table t{i} using stb1 tags({i})")
tdSql.execute(f"create table tt{i} using stb2 tags({i})")
tdSql.execute(f"create table t{i} using db.stb1 tags({i})")
tdSql.execute(f"create table tt{i} using db.stb2 tags({i})")
def diff_support_stable(self):
tdSql.query(" select diff(1) from stb1 ")
tdSql.query(" select diff(1) from db.stb1 ")
tdSql.query("select diff(c1) from stb1 partition by tbname ")
tdSql.query("select diff(c1) from db.stb1 partition by tbname ")
# tdSql.query("select diff(st1) from stb1 partition by tbname")
# tdSql.checkRows(229)
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname")
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname")
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname")
# # bug need fix
# tdSql.query("select diff(st1+c1) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(19)
# tdSql.error("select diff(st1+c1) from stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname")
# bug need fix
# tdSql.query("select tbname , diff(c1) from stb1 partition by tbname")
# tdSql.checkRows(199)
# tdSql.query("select tbname , diff(st1) from stb1 partition by tbname")
# tdSql.checkRows(199)
# tdSql.query("select tbname , diff(st1) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(19)
tdSql.query("select tbname , diff(c1) from db.stb1 partition by tbname")
tdSql.query("select tbname , diff(st1) from db.stb1 partition by tbname")
# partition by tags
# tdSql.query("select st1 , diff(c1) from stb1 partition by st1")
# tdSql.checkRows(199)
# tdSql.query("select diff(c1) from stb1 partition by st1")
# tdSql.checkRows(199)
# tdSql.query("select st1 , diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# partition by col
# tdSql.query("select c1 , diff(c1) from stb1 partition by c1")
# tdSql.checkRows(199)
# tdSql.query("select diff(c1) from stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select c1 , diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# partition by tags
tdSql.query("select st1 , diff(c1) from db.stb1 partition by st1")
tdSql.query("select diff(c1) from db.stb1 partition by st1")
def diff_test_run(self) :
......@@ -428,18 +405,18 @@ class TDTestCase:
tdLog.printNoPrefix("######## insert data in the range near the max(bigint/double):")
tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
tdSql.execute(f"insert into db.t1(ts, c1,c2,c5,c7) values "
f"({nowtime - (per_table_rows + 1) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})")
tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
tdSql.execute(f"insert into db.t1(ts, c1,c2,c5,c7) values "
f"({nowtime - (per_table_rows + 2) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})")
tdLog.printNoPrefix("######## insert data in the range near the min(bigint/double):")
tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
tdSql.execute(f"insert into db.t1(ts, c1,c2,c5,c7) values "
f"({nowtime - (per_table_rows + 1) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {1-2**63})")
tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
tdSql.execute(f"insert into db.t1(ts, c1,c2,c5,c7) values "
f"({nowtime - (per_table_rows + 2) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {512-2**63})")
......@@ -335,7 +335,7 @@ class TDTestCase:
# case11 = {"alias": ", st1"}
# self.checksample(**case11)
tdSql.query("select sample( c1 , 1 ) , st1 from t1")
# case12 = {"alias": ", c1"}
# self.checksample(**case12)
......@@ -497,7 +497,7 @@ class TDTestCase:
# tdSql.query(" select sample(c1 , 1) + 2 from t1 ")
err41 = {"alias": "+ avg(c1)"}
# self.checksample(**err41) # mix with arithmetic 2
# err42 = {"alias": ", c1"}
# self.checksample(**err42)
tdSql.query("select sample( c1 , 1 ) , c1 from t1")
......@@ -605,14 +605,14 @@ class TDTestCase:
tdSql.execute(f"create table tt{i} using stb2 tags({i})")
def check_sample(self , sample_query , origin_query ):
origin_datas = tdSql.queryResult
sample_datas = tdSql.queryResult
......@@ -620,7 +620,7 @@ class TDTestCase:
for ind , sample_data in enumerate(sample_datas):
if sample_data not in origin_datas:
status = False
if status:
tdLog.info(" sample data is in datas groups ,successed sql is : %s" % sample_query )
......@@ -637,7 +637,7 @@ class TDTestCase:
tags (t1 int)
create table t1
......@@ -689,7 +689,7 @@ class TDTestCase:
tdSql.error(" select sample(c1,ts) from t1 ")
tdSql.error(" select sample(c1,false) from t1 ")
tdSql.query(" select sample(123,1) from t1 ")
tdSql.query(" select sample(c1,2) from t1 ")
tdSql.query(" select sample(c1,10) from t1 ")
......@@ -704,10 +704,10 @@ class TDTestCase:
tdSql.error(" select sample(c1,-1) from t1 ")
# bug need fix
# bug need fix
# tdSql.query("select sample(c1 ,2) , 123 from stb1;")
# all type support
# all type support
tdSql.query(" select sample(c1 , 20 ) from ct4 ")
......@@ -761,7 +761,7 @@ class TDTestCase:
self.check_sample("select sample( c1 ,3 ) from t1 where c1 between 1 and 10" ,"select c1 from t1 where c1 between 1 and 10")
# join
# join
tdSql.query("select sample( ct4.c1 , 1 ) from ct1, ct4 where ct4.ts=ct1.ts")
......@@ -772,22 +772,22 @@ class TDTestCase:
self.check_sample("select sample(c1,2) from stb1 partition by tbname" , "select c1 from stb1 partition by tbname")
# nest query
# nest query
# tdSql.query("select sample(c1,2) from (select c1 from t1); ")
# tdSql.checkRows(2)
# union all
# union all
tdSql.query("select sample(c1,2) from t1 union all select sample(c1,3) from t1")
# fill interval
# not support mix with other function
# not support mix with other function
tdSql.error("select top(c1,2) , sample(c1,2) from ct1")
tdSql.error("select max(c1) , sample(c1,2) from ct1")
tdSql.query("select c1 , sample(c1,2) from ct1")
# bug for mix with scalar
# bug for mix with scalar
tdSql.query("select 123 , sample(c1,100) from ct1")
tdSql.query("select sample(c1,100)+2 from ct1")
tdSql.query("select abs(sample(c1,100)) from ct1")
......@@ -864,13 +864,13 @@ class TDTestCase:
for i in range(2000):
ts = self.ts+i*10
tdSql.execute(f"insert into sub_tb values({ts} ,{i})")
tdSql.query("select count(*) from st")
tdSql.query("select sample(c1 ,1000) from st")
# bug need fix
# bug need fix
tdSql.query("select c1 ,t1, sample(c1,2) from db.stb1 partition by c1 ")
tdSql.query("select sample(c1,2) from db.stb1 partition by c1 ")
# tdSql.query("select c1 ,ind, sample(c1,2) from sample_db.st partition by c1 ")
......@@ -270,7 +270,7 @@ class TDTestCase:
if not vote_act:
print("=======before_revote_leader_infos ======\n" , before_leader_infos)
print("=======after_revote_leader_infos ======\n" , after_leader_infos)
tdLog.exit(" ===maybe revote not occured , there is no dnode offline ====")
tdLog.info(" ===maybe revote not occured , there is no dnode offline ====")
for vgroup_info in vote_act:
for ind , role in enumerate(vgroup_info):
......@@ -96,6 +96,8 @@ python3 ./test.py -f 2-query/distribute_agg_stddev.py
python3 ./test.py -f 2-query/distribute_agg_stddev.py -R
python3 ./test.py -f 2-query/distribute_agg_sum.py
python3 ./test.py -f 2-query/distribute_agg_sum.py -R
python3 ./test.py -f 2-query/interp.py
python3 ./test.py -f 2-query/interp.py -R
......@@ -213,7 +215,7 @@ python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_query
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync_force_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader_forece_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_mnode3_insertdatas_querys.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower_force_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower.py -N 4 -M 1
......@@ -338,6 +340,7 @@ python3 ./test.py -f 2-query/arcsin.py -Q 2
python3 ./test.py -f 2-query/arccos.py -Q 2
python3 ./test.py -f 2-query/arctan.py -Q 2
python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 2
python3 ./test.py -f 2-query/interp.py -Q 2
# python3 ./test.py -f 2-query/nestedQuery.py -Q 2
# python3 ./test.py -f 2-query/nestedQuery_str.py -Q 2
......@@ -457,3 +460,4 @@ python3 ./test.py -f 2-query/max_partition.py -Q 3
python3 ./test.py -f 2-query/last_row.py -Q 3
python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
python3 ./test.py -f 2-query/sml.py -Q 3
python3 ./test.py -f 2-query/interp.py -Q 3
......@@ -280,7 +280,7 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
static void shellInitArgs(int argc, char *argv[]) {
for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-p", 2) == 0) {
// printf(shell.info.clientVersion, tsOsName, taos_get_client_info());
// printf(shell.info.clientVersion, taos_get_client_info());
if (strlen(argv[i]) == 2) {
printf("Enter password: ");
......@@ -389,8 +389,8 @@ static int32_t shellCheckArgs() {
int32_t shellParseArgs(int32_t argc, char *argv[]) {
shellInitArgs(argc, argv);
shell.info.clientVersion =
"Welcome to the TDengine shell from %s, Client Version:%s\r\n"
"Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.\r\n\r\n";
"Welcome to the TDengine Command Line Interface, Client Version:%s\r\n"
"Copyright (c) 2022 by TDengine, all rights reserved.\r\n\r\n";
shell.info.promptHeader = TAOS_CONSOLE_PROMPT_HEADER;
shell.info.promptContinue = TAOS_CONSOLE_PROMPT_CONTINUE;
shell.info.promptSize = 6;
......@@ -1001,7 +1001,7 @@ void *shellThreadLoop(void *arg) {
int32_t shellExecute() {
printf(shell.info.clientVersion, shell.info.osname, taos_get_client_info());
printf(shell.info.clientVersion, taos_get_client_info());
SShellArgs *pArgs = &shell.args;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册