提交 a0858f39 编写于 作者: A Alex Duan

Merge branch 'master' into max_cfg

......@@ -15,7 +15,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -23,6 +23,7 @@ steps:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: test_arm64_bionic
......@@ -39,7 +40,7 @@ steps:
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make
- make -j4
trigger:
event:
- pull_request
......@@ -66,7 +67,7 @@ steps:
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make
- make -j4
trigger:
event:
- pull_request
......@@ -91,7 +92,7 @@ steps:
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make
- make -j4
trigger:
event:
- pull_request
......@@ -116,7 +117,7 @@ steps:
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make
- make -j4
trigger:
event:
- pull_request
......@@ -142,7 +143,7 @@ steps:
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch32 > /dev/null
- make
- make -j4
trigger:
event:
- pull_request
......@@ -150,6 +151,7 @@ steps:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: build_trusty
......@@ -168,7 +170,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -176,6 +178,7 @@ steps:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: build_xenial
......@@ -193,7 +196,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -201,7 +204,7 @@ steps:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: build_bionic
......@@ -218,7 +221,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -226,6 +229,7 @@ steps:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: build_centos7
......@@ -241,7 +245,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -249,4 +253,4 @@ steps:
branch:
- develop
- master
- 2.0
\ No newline at end of file
......@@ -160,7 +160,6 @@ pipeline {
skipbuild='2'
skipbuild=sh(script: "git log -2 --pretty=%B | fgrep -ie '[skip ci]' -e '[ci skip]' && echo 1 || echo 2", returnStdout:true)
println skipbuild
}
sh'''
rm -rf ${WORKSPACE}.tes
......@@ -225,6 +224,26 @@ pipeline {
steps {
timeout(time: 55, unit: 'MINUTES'){
pre_test()
sh '''
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
nohup taosd >/dev/null &
sleep 10
'''
sh '''
cd ${WKC}/tests/examples/nodejs
npm install td2.0-connector > /dev/null 2>&1
node nodejsChecker.js host=localhost
'''
sh '''
cd ${WKC}/tests/examples/C#/taosdemo
mcs -out:taosdemo *.cs > /dev/null 2>&1
echo '' |./taosdemo
'''
sh '''
cd ${WKC}/tests/gotest
bash batchtest.sh
'''
sh '''
cd ${WKC}/tests
./test-all.sh b1fq
......@@ -237,21 +256,19 @@ pipeline {
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
}
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_crash_gen_val_log.sh
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
timeout(time: 60, unit: 'MINUTES'){
// sh '''
// cd ${WKC}/tests/pytest
// rm -rf /var/lib/taos/*
// rm -rf /var/log/taos/*
// ./handle_crash_gen_val_log.sh
// '''
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
......
......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.1.6.0")
SET(TD_VER_NUMBER "2.1.7.2")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -196,7 +196,7 @@ not_compact_enough:
/* Normally defined in stdlib.h. Output buf must contain PATH_MAX bytes */
char *realpath(const char *path, char *outbuf) {
char *pOutbuf = outbuf;
char *pOutbuf1;
char *pOutbuf1 = NULL;
int iErr;
const char *pc;
......@@ -521,7 +521,7 @@ int ResolveLinksA(const char *path, char *buf, size_t bufsize) {
/* Normally defined in stdlib.h. Output buf must contain PATH_MAX bytes */
char *realpathU(const char *path, char *outbuf) {
char *pOutbuf = outbuf;
char *pOutbuf1;
char *pOutbuf1 = NULL;
char *pPath1 = NULL;
char *pPath2 = NULL;
int iErr;
......
Subproject commit 0ca5b15a8eac40327dd737be52c926fa5675712c
Subproject commit ceda5bf9fcd7836509ac97dcc0056b3f1dd48cc5
......@@ -2,18 +2,18 @@
## <a class="anchor" id="intro"></a>TDengine 简介
TDengine是涛思数据面对高速增长的物联网大数据市场和技术挑战推出的创新性的大数据处理产品,它不依赖任何第三方软件,也不是优化或包装了一个开源的数据库或流式计算产品,而是在吸取众多传统关系型数据库、NoSQL数据库、流式计算引擎、消息队列等软件的优点之后自主开发的产品,在时序空间大数据处理上,有着自己独到的优势。
TDengine 是涛思数据面对高速增长的物联网大数据市场和技术挑战推出的创新性的大数据处理产品,它不依赖任何第三方软件,也不是优化或包装了一个开源的数据库或流式计算产品,而是在吸取众多传统关系型数据库、NoSQL 数据库、流式计算引擎、消息队列等软件的优点之后自主开发的产品,在时序空间大数据处理上,有着自己独到的优势。
TDengine的模块之一是时序数据库。但除此之外,为减少研发的复杂度、系统维护的难度,TDengine还提供缓存、消息队列、订阅、流式计算等功能,为物联网、工业互联网大数据的处理提供全栈的技术方案,是一个高效易用的物联网大数据平台。与Hadoop等典型的大数据平台相比,它具有如下鲜明的特点:
TDengine 的模块之一是时序数据库。但除此之外,为减少研发的复杂度、系统维护的难度,TDengine 还提供缓存、消息队列、订阅、流式计算等功能,为物联网、工业互联网大数据的处理提供全栈的技术方案,是一个高效易用的物联网大数据平台。与 Hadoop 等典型的大数据平台相比,它具有如下鲜明的特点:
* __10倍以上的性能提升__:定义了创新的数据存储结构,单核每秒能处理至少2万次请求,插入数百万个数据点,读出一千万以上数据点,比现有通用数据库快十倍以上。
* __硬件或云服务成本降至1/5__:由于超强性能,计算资源不到通用大数据方案的1/5;通过列式存储和先进的压缩算法,存储空间不到通用数据库的1/10。
* __全栈时序数据处理引擎__:将数据库、消息队列、缓存、流式计算等功能融为一体,应用无需再集成Kafka/Redis/HBase/Spark/HDFS等软件,大幅降低应用开发和维护的复杂度成本。
* __强大的分析功能__:无论是十年前还是一秒钟前的数据,指定时间范围即可查询。数据可在时间轴上或多个设备上进行聚合。即席查询可通过Shell, Python, R, MATLAB随时进行。
* __与第三方工具无缝连接__:不用一行代码,即可与Telegraf, Grafana, EMQ, HiveMQ, Prometheus, MATLAB, R等集成。后续将支持OPC, Hadoop, Spark等, BI工具也将无缝连接。
* __零运维成本、零学习成本__:安装集群简单快捷,无需分库分表,实时备份。类似标准SQL,支持RESTful, 支持Python/Java/C/C++/C#/Go/Node.js, 与MySQL相似,零学习成本。
* __10 倍以上的性能提升__:定义了创新的数据存储结构,单核每秒能处理至少 2 万次请求,插入数百万个数据点,读出一千万以上数据点,比现有通用数据库快十倍以上。
* __硬件或云服务成本降至 1/5__:由于超强性能,计算资源不到通用大数据方案的 1/5;通过列式存储和先进的压缩算法,存储空间不到通用数据库的 1/10。
* __全栈时序数据处理引擎__:将数据库、消息队列、缓存、流式计算等功能融为一体,应用无需再集成 Kafka/Redis/HBase/Spark/HDFS 等软件,大幅降低应用开发和维护的复杂度成本。
* __强大的分析功能__:无论是十年前还是一秒钟前的数据,指定时间范围即可查询。数据可在时间轴上或多个设备上进行聚合。即席查询可通过 Shell, Python, R, MATLAB 随时进行。
* __与第三方工具无缝连接__:不用一行代码,即可与 Telegraf, Grafana, EMQ, HiveMQ, Prometheus, MATLAB, R 等集成。后续将支持 OPC, Hadoop, Spark 等,BI 工具也将无缝连接。
* __零运维成本、零学习成本__:安装集群简单快捷,无需分库分表,实时备份。类标准 SQL,支持 RESTful,支持 Python/Java/C/C++/C#/Go/Node.js, 与 MySQL 相似,零学习成本。
采用TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。但需要指出的是,因充分利用了物联网时序数据的特点,它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM等通用型数据。
采用 TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。但需要指出的是,因充分利用了物联网时序数据的特点,它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM 等通用型数据。
![TDengine技术生态图](page://images/eco_system.png)
<center>图 1. TDengine技术生态图</center>
......@@ -21,42 +21,47 @@ TDengine的模块之一是时序数据库。但除此之外,为减少研发的
## <a class="anchor" id="scenes"></a>TDengine 总体适用场景
作为一个IOT大数据平台,TDengine的典型适用场景是在IOT范畴,而且用户有一定的数据量。本文后续的介绍主要针对这个范畴里面的系统。范畴之外的系统,比如CRM,ERP等,不在本文讨论范围内。
作为一个 IOT 大数据平台,TDengine 的典型适用场景是在 IOT 范畴,而且用户有一定的数据量。本文后续的介绍主要针对这个范畴里面的系统。范畴之外的系统,比如 CRM,ERP 等,不在本文讨论范围内。
### 数据源特点和需求
从数据源角度,设计人员可以从下面几个角度分析TDengine在目标应用系统里面的适用性。
从数据源角度,设计人员可以从下面几个角度分析 TDengine 在目标应用系统里面的适用性。
|数据源特点和需求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|总体数据量巨大| | | √ |TDengine在容量方面提供出色的水平扩展功能,并且具备匹配高压缩的存储结构,达到业界最优的存储效率。|
|数据输入速度偶尔或者持续巨大| | | √ | TDengine的性能大大超过同类产品,可以在同样的硬件环境下持续处理大量的输入数据,并且提供很容易在用户环境里面运行的性能评估工具。|
|数据源数目巨大| | | √ |TDengine设计中包含专门针对大量数据源的优化,包括数据的写入和查询,尤其适合高效处理海量(千万或者更多量级)的数据源。|
|总体数据量巨大| | | √ | TDengine 在容量方面提供出色的水平扩展功能,并且具备匹配高压缩的存储结构,达到业界最优的存储效率。|
|数据输入速度偶尔或者持续巨大| | | √ | TDengine 的性能大大超过同类产品,可以在同样的硬件环境下持续处理大量的输入数据,并且提供很容易在用户环境里面运行的性能评估工具。|
|数据源数目巨大| | | √ | TDengine 设计中包含专门针对大量数据源的优化,包括数据的写入和查询,尤其适合高效处理海量(千万或者更多量级)的数据源。|
### 系统架构要求
|系统架构要求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|要求简单可靠的系统架构| | | √ |TDengine的系统架构非常简单可靠,自带消息队列,缓存,流式计算,监控等功能,无需集成额外的第三方产品。|
|要求容错和高可靠| | | √ |TDengine的集群功能,自动提供容错灾备等高可靠功能。|
|标准化规范| | | √ |TDengine使用标准的SQL语言提供主要功能,遵守标准化规范。|
|要求简单可靠的系统架构| | | √ | TDengine 的系统架构非常简单可靠,自带消息队列,缓存,流式计算,监控等功能,无需集成额外的第三方产品。|
|要求容错和高可靠| | | √ | TDengine 的集群功能,自动提供容错灾备等高可靠功能。|
|标准化规范| | | √ | TDengine 使用标准的SQL语言提供主要功能,遵守标准化规范。|
### 系统功能需求
|系统功能需求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|要求完整的内置数据处理算法| | √ | |TDengine的实现了通用的数据处理算法,但是还没有做到妥善处理各行各业的所有要求,因此特殊类型的处理还需要应用层面处理。|
|需要大量的交叉查询处理| | √ | |这种类型的处理更多应该用关系型数据系统处理,或者应该考虑TDengine和关系型数据系统配合实现系统功能。|
|要求完整的内置数据处理算法| | √ | | TDengine 的实现了通用的数据处理算法,但是还没有做到妥善处理各行各业的所有要求,因此特殊类型的处理还需要应用层面处理。|
|需要大量的交叉查询处理| | √ | |这种类型的处理更多应该用关系型数据系统处理,或者应该考虑 TDengine 和关系型数据系统配合实现系统功能。|
### 系统性能需求
|系统性能需求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|要求较大的总体处理能力| | | √ |TDengine的集群功能可以轻松地让多服务器配合达成处理能力的提升。|
|要求高速处理数据 | | | √ |TDengine的专门为IOT优化的存储和数据处理的设计,一般可以让系统得到超出同类产品多倍数的处理速度提升。|
|要求快速处理小粒度数据| | | √ |这方面TDengine性能可以完全对标关系型和NoSQL型数据处理系统。|
|要求较大的总体处理能力| | | √ | TDengine 的集群功能可以轻松地让多服务器配合达成处理能力的提升。|
|要求高速处理数据 | | | √ | TDengine 的专门为 IOT 优化的存储和数据处理的设计,一般可以让系统得到超出同类产品多倍数的处理速度提升。|
|要求快速处理小粒度数据| | | √ |这方面 TDengine 性能可以完全对标关系型和 NoSQL 型数据处理系统。|
### 系统维护需求
|系统维护需求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|要求系统可靠运行| | | √ |TDengine的系统架构非常稳定可靠,日常维护也简单便捷,对维护人员的要求简洁明了,最大程度上杜绝人为错误和事故。|
|要求系统可靠运行| | | √ | TDengine 的系统架构非常稳定可靠,日常维护也简单便捷,对维护人员的要求简洁明了,最大程度上杜绝人为错误和事故。|
|要求运维学习成本可控| | | √ |同上。|
|要求市场有大量人才储备| √ | | |TDengine作为新一代产品,目前人才市场里面有经验的人员还有限。但是学习成本低,我们作为厂家也提供运维的培训和辅助服务。|
|要求市场有大量人才储备| √ | | | TDengine 作为新一代产品,目前人才市场里面有经验的人员还有限。但是学习成本低,我们作为厂家也提供运维的培训和辅助服务。|
......@@ -144,6 +144,9 @@ keepColumnName 1
# max length of an SQL
# maxSQLLength 65480
# max length of WildCards
# maxWildCardsLength 100
# the maximum number of records allowed for super table time sorting
# maxNumOfOrderedRes 100000
......
......@@ -19,18 +19,21 @@ else
fi
# Dynamic directory
data_dir="/var/lib/taos"
if [ "$osType" != "Darwin" ]; then
data_dir="/var/lib/taos"
log_dir="/var/log/taos"
else
log_dir=~/TDengine/log
data_dir="/usr/local/var/lib/taos"
log_dir="/usr/local/var/log/taos"
fi
data_link_dir="/usr/local/taos/data"
log_link_dir="/usr/local/taos/log"
cfg_install_dir="/etc/taos"
if [ "$osType" != "Darwin" ]; then
cfg_install_dir="/etc/taos"
else
cfg_install_dir="/usr/local/etc/taos"
fi
if [ "$osType" != "Darwin" ]; then
bin_link_dir="/usr/bin"
......@@ -44,10 +47,18 @@ else
fi
#install main path
install_main_dir="/usr/local/taos"
if [ "$osType" != "Darwin" ]; then
install_main_dir="/usr/local/taos"
else
install_main_dir="/usr/local/Cellar/tdengine/${verNumber}"
fi
# old bin dir
bin_dir="/usr/local/taos/bin"
if [ "$osType" != "Darwin" ]; then
bin_dir="/usr/local/taos/bin"
else
bin_dir="/usr/local/Cellar/tdengine/${verNumber}/bin"
fi
service_config_dir="/etc/systemd/system"
......@@ -59,12 +70,11 @@ GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
csudo=""
if command -v sudo > /dev/null; then
csudo="sudo"
fi
if [ "$osType" != "Darwin" ]; then
if command -v sudo > /dev/null; then
csudo="sudo"
fi
initd_mod=0
service_mod=2
if pidof systemd &> /dev/null; then
......@@ -137,17 +147,17 @@ function install_main_path() {
function install_bin() {
# Remove links
${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${bin_link_dir}/perfMonitor || :
${csudo} rm -f ${bin_link_dir}/set_core || :
${csudo} rm -f ${bin_link_dir}/rmtaos || :
fi
${csudo} rm -f ${bin_link_dir}/rmtaos || :
${csudo} cp -r ${binary_dir}/build/bin/* ${install_main_dir}/bin
${csudo} cp -r ${script_dir}/taosd-dump-cfg.gdb ${install_main_dir}/bin
......@@ -161,19 +171,18 @@ function install_bin() {
${csudo} chmod 0555 ${install_main_dir}/bin/*
#Make link
[ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || :
[ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || :
[ -x ${install_main_dir}/bin/taosd ] && ${csudo} ln -s ${install_main_dir}/bin/taosd ${bin_link_dir}/taosd || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || :
if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/taosd ] && ${csudo} ln -s ${install_main_dir}/bin/taosd ${bin_link_dir}/taosd || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || :
[ -x ${install_main_dir}/bin/perfMonitor ] && ${csudo} ln -s ${install_main_dir}/bin/perfMonitor ${bin_link_dir}/perfMonitor || :
[ -x ${install_main_dir}/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
fi
if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/rmtaos || :
else
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/rmtaos || :
fi
}
......@@ -220,7 +229,7 @@ function install_jemalloc() {
fi
if [ -d /etc/ld.so.conf.d ]; then
${csudo} echo "/usr/local/lib" > /etc/ld.so.conf.d/jemalloc.conf
echo "/usr/local/lib" | ${csudo} tee /etc/ld.so.conf.d/jemalloc.conf
${csudo} ldconfig
else
echo "/etc/ld.so.conf.d not found!"
......@@ -246,10 +255,8 @@ function install_lib() {
fi
else
${csudo} cp -Rf ${binary_dir}/build/lib/libtaos.* ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} ln -sf ${install_main_dir}/driver/libtaos.1.dylib ${lib_link_dir}/libtaos.1.dylib
${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
fi
install_jemalloc
if [ "$osType" != "Darwin" ]; then
......@@ -259,10 +266,14 @@ function install_lib() {
function install_header() {
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
fi
${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/*
${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
if [ "$osType" != "Darwin" ]; then
${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
fi
}
function install_config() {
......@@ -270,23 +281,20 @@ function install_config() {
if [ ! -f ${cfg_install_dir}/taos.cfg ]; then
${csudo} mkdir -p ${cfg_install_dir}
[ -f ${script_dir}/../cfg/taos.cfg ] && ${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir}
[ -f ${script_dir}/../cfg/taos.cfg ] &&
${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir}
${csudo} chmod 644 ${cfg_install_dir}/*
fi
${csudo} cp -f ${script_dir}/../cfg/taos.cfg ${install_main_dir}/cfg/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
if [ "$osType" != "Darwin" ]; then ${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
fi
}
function install_log() {
${csudo} rm -rf ${log_dir} || :
if [ "$osType" != "Darwin" ]; then
${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir}
else
mkdir -p ${log_dir} && chmod 777 ${log_dir}
fi
${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir}
${csudo} ln -s ${log_dir} ${install_main_dir}/log
}
......@@ -307,7 +315,6 @@ function install_connector() {
echo "WARNING: go connector not found, please check if want to use it!"
fi
${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
}
......@@ -487,24 +494,21 @@ function install_TDengine() {
else
echo -e "${GREEN}Start to install TDEngine Client ...${NC}"
fi
install_main_path
if [ "$osType" != "Darwin" ]; then
install_data
fi
install_data
install_log
install_header
install_lib
install_connector
install_examples
install_bin
if [ "$osType" != "Darwin" ]; then
install_service
fi
install_config
if [ "$osType" != "Darwin" ]; then
......
name: tdengine
base: core18
version: '2.1.6.0'
version: '2.1.7.2'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.1.6.0
- usr/lib/libtaos.so.2.1.7.2
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -4,6 +4,8 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(jni)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/plugins/http/inc)
AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
......
......@@ -50,6 +50,12 @@ void tscUnlockByThread(int64_t *lockedBy);
int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void tscFreeRetrieveSup(SSqlObj *pSql);
#ifdef __cplusplus
}
#endif
......
......@@ -29,15 +29,16 @@ extern "C" {
#include "tsched.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#pragma pack(push,1)
......@@ -142,6 +143,7 @@ bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo);
bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
......@@ -188,6 +190,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
......@@ -358,6 +361,8 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql);
#ifdef __cplusplus
}
#endif
......
......@@ -38,6 +38,11 @@ extern "C" {
#include "qUtil.h"
#include "tcmdtype.h"
typedef enum {
TAOS_REQ_FROM_SHELL,
TAOS_REQ_FROM_HTTP
} SReqOrigin;
// forward declaration
struct SSqlInfo;
......@@ -123,17 +128,15 @@ typedef struct {
int32_t kvLen; // len of SKVRow
} SMemRowInfo;
typedef struct {
uint8_t memRowType;
uint8_t compareStat; // 0 unknown, 1 need compare, 2 no need
TDRowTLenT dataRowInitLen;
uint8_t memRowType; // default is 0, that is SDataRow
uint8_t compareStat; // 0 no need, 1 need compare
TDRowTLenT kvRowInitLen;
SMemRowInfo *rowInfo;
} SMemRowBuilder;
typedef enum {
ROW_COMPARE_UNKNOWN = 0,
ROW_COMPARE_NO_NEED = 0,
ROW_COMPARE_NEED = 1,
ROW_COMPARE_NO_NEED = 2,
} ERowCompareStat;
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec);
......@@ -309,6 +312,7 @@ typedef struct {
char * data;
TAOS_ROW tsrow;
TAOS_ROW urow;
bool dataConverted;
int32_t* length; // length for each field for current row
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex* pColumnIndex;
......@@ -342,6 +346,7 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet;
pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj
SReqOrigin from;
} STscObj;
typedef struct SSubqueryState {
......@@ -377,6 +382,7 @@ typedef struct SSqlObj {
SSubqueryState subState;
struct SSqlObj **pSubs;
struct SSqlObj *rootObj;
int64_t metaRid;
int64_t svgroupRid;
......@@ -441,7 +447,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
......@@ -488,6 +494,7 @@ bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, const char* sql);
int32_t tscInvalidOperationMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
......
此差异已折叠。
......@@ -44,6 +44,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = fp;
pSql->fetchFp = fp;
pSql->rootObj = pSql;
registerSqlObj(pSql);
......@@ -168,6 +169,9 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
} else {
pRes->code = numOfRows;
}
if (pRes->code == TSDB_CODE_SUCCESS) {
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
}
tscAsyncResultOnError(pSql);
return;
......@@ -358,15 +362,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { // stmt insert
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
(*pSql->fp)(pSql->param, pSql, code);
} else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert
tscImportDataFromFile(pSql);
......
......@@ -35,6 +35,7 @@ typedef struct SCompareParam {
static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
int32_t ret = 0;
size_t size = taosArrayGetSize(columnIndexList);
if (size > 0) {
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
......@@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
(*hasPrev) = true;
}
// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the
// output value to multiple rows
static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) {
if (numOfRows <= 1) {
return ;
return;
}
for (int32_t k = 0; k < numOfOutput; ++k) {
......@@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
continue;
}
int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result
char* src = pCtx[k].pOutput;
char* src = pCtx[k].pOutput;
char* dst = pCtx[k].pOutput + pCtx[k].outputBytes;
for (int32_t i = 0; i < inc; ++i) {
pCtx[k].pOutput += pCtx[k].outputBytes;
memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes);
// Let's start from the second row, as the first row has result value already.
for (int32_t i = 1; i < numOfRows; ++i) {
memcpy(dst, src, (size_t)pCtx[k].outputBytes);
dst += pCtx[k].outputBytes;
}
}
}
static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) {
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
} else {
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
}
static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) {
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else {
aAggs[functionId].xFinalize(&pCtx[j]);
}
}
}
......@@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
SMultiwayMergeInfo* pInfo = pOperator->info;
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES);
char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
add[i] = pCtx[i].pInput;
addrPtr[i] = pCtx[i].pInput;
pCtx[i].size = 1;
}
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
} else {
for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pCtx[j]);
}
doFinalizeResultImpl(pInfo, pCtx, numOfExpr);
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
......@@ -643,7 +649,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) {
pCtx[j].ptsOutputBuf = pCtx[0].pOutput;
if(j>0) pCtx[j].ptsOutputBuf = pCtx[j-1].pOutput;
}
}
......@@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
}
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
}
} else {
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
}
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
......@@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
{
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].pInput = add[i];
pCtx[i].pInput = addrPtr[i];
}
}
tfree(add);
tfree(addrPtr);
}
static bool isAllSourcesCompleted(SGlobalMerger *pMerger) {
......@@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
bool sameGroup = true;
if (pInfo->hasPrev) {
// todo refactor extract method
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
// if this row belongs to current result set group
......@@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
break;
}
bool sameGroup = true;
if (pAggInfo->hasGroupColData) {
bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
if (!sameGroup) {
sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
if (!sameGroup && !pAggInfo->multiGroupResults) {
*newgroup = true;
pAggInfo->hasDataBlockForNewGroup = true;
pAggInfo->pExistBlock = pBlock;
......@@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
}
if (handleData) { // data in current group is all handled
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
int32_t functionId = pAggInfo->binfo.pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
}
doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
pAggInfo->binfo.pRes->info.rows += numOfRows;
pAggInfo->binfo.pRes->info.rows += numOfRows;
setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows);
}
......@@ -1019,71 +975,127 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
return (pRes->info.rows != 0)? pRes:NULL;
}
static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSLimitOperatorInfo *pInfo = pOperator->info;
assert(pInfo->currentGroupOffset >= 0);
static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) {
if (pInfo->currentOffset > 0) {
pInfo->currentOffset -= 1;
} else {
// discard the data rows in current group
if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) {
size_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock);
for (int32_t i = 0; i < num1; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
}
SColumnInfo *pColInfo = &pColInfoData->info;
char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes);
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
memcpy(pDst, pSrc, pColInfo->bytes);
}
pInfo->rowsTotal += 1;
pInfo->pRes->info.rows += 1;
}
}
}
static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) {
if (pInfo->capacity < pResultBlock->info.rows + numOfRows) {
int32_t total = pResultBlock->info.rows + numOfRows;
size_t num = taosArrayGetSize(pResultBlock->pDataBlock);
for (int32_t i = 0; i < num; ++i) {
SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i);
char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes);
if (tmp != NULL) {
pInfoData->pData = tmp;
} else {
// todo handle the malloc failure
}
return pBlock;
pInfo->capacity = total;
pInfo->threshold = (int64_t) (total * 0.8);
}
}
}
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
enum {
BLOCK_NEW_GROUP = 1,
BLOCK_NO_GROUP = 2,
BLOCK_SAME_GROUP = 3,
};
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
static int32_t doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) {
int32_t rowIndex = 0;
while (rowIndex < pBlock->info.rows) {
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
bool samegroup = true;
if (pInfo->hasPrev) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
SColumnInfo *pColInfo = &pColInfoData->info;
char *d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes);
if (ret != 0) { // it is a new group
samegroup = false;
break;
}
}
}
while(1) {
if (*newgroup) {
pInfo->currentGroupOffset -= 1;
*newgroup = false;
if (!samegroup || !pInfo->hasPrev) {
pInfo->ignoreCurrentGroup = false;
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev);
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
pInfo->rowsTotal = 0;
if (pInfo->currentGroupOffset > 0) {
pInfo->ignoreCurrentGroup = true;
pInfo->currentGroupOffset -= 1; // now we are in the next group data
rowIndex += 1;
continue;
}
// A new group has arrived according to the result rows, and the group limitation has already reached.
// Let's jump out of current loop and return immediately.
if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return BLOCK_NO_GROUP;
}
while ((*newgroup) == false) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pInfo->groupTotal += 1;
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
// data in current group not allowed, return if current result does not belong to the previous group.And there
// are results exists in current SSDataBlock
if (!pInfo->multigroupResult && !samegroup && pInfo->pRes->info.rows > 0) {
return BLOCK_NEW_GROUP;
}
// now we have got the first data block of the next group.
if (pInfo->currentGroupOffset == 0) {
return pBlock;
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
} else { // handle the offset in the same group
// All the data in current group needs to be discarded, due to the limit parameter in the SQL statement
if (pInfo->ignoreCurrentGroup) {
rowIndex += 1;
continue;
}
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
}
return NULL;
rowIndex += 1;
}
return BLOCK_SAME_GROUP;
}
SSDataBlock* doSLimit(void* param, bool* newgroup) {
......@@ -1093,63 +1105,41 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
}
SSLimitOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
SSDataBlock *pBlock = NULL;
while (1) {
pBlock = skipGroupBlock(pOperator, newgroup);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
if (*newgroup) { // a new group arrives
pInfo->groupTotal += 1;
pInfo->rowsTotal = 0;
pInfo->currentOffset = pInfo->limit.offset;
}
if (pInfo->pPrevBlock != NULL) {
ensureOutputBuf(pInfo, pInfo->pRes, pInfo->pPrevBlock->info.rows);
int32_t ret = doSlimitImpl(pOperator, pInfo, pInfo->pPrevBlock);
assert(ret != BLOCK_NEW_GROUP);
assert(pInfo->currentGroupOffset == 0);
if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else {
if (pInfo->currentOffset == 0) {
break;
}
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
pInfo->pPrevBlock = NULL;
}
// move the remain rows of this data block to the front.
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
assert(pInfo->currentGroupOffset >= 0);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
}
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pInfo->currentOffset = 0;
break;
if (pBlock == NULL) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
}
if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
return NULL;
}
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) {
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal);
pInfo->rowsTotal = pInfo->limit.limit;
ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows);
int32_t ret = doSlimitImpl(pOperator, pInfo, pBlock);
if (ret == BLOCK_NEW_GROUP) {
pInfo->pPrevBlock = pBlock;
return pInfo->pRes;
}
if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
pOperator->status = OP_EXEC_DONE;
if (pOperator->status == OP_EXEC_DONE) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
} else {
pInfo->rowsTotal += pBlock->info.rows;
// now the number of rows in current group is enough, let's return to the invoke function
if (pInfo->pRes->info.rows > pInfo->threshold) {
return pInfo->pRes;
}
}
return pBlock;
}
......@@ -51,20 +51,18 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3
}
}
// default compareStat is ROW_COMPARE_NO_NEED
if (nBoundCols == 0) { // file input
pBuilder->memRowType = SMEM_ROW_DATA;
pBuilder->compareStat = ROW_COMPARE_NO_NEED;
return TSDB_CODE_SUCCESS;
} else {
float boundRatio = ((float)nBoundCols / (float)nCols);
if (boundRatio < KVRatioKV) {
pBuilder->memRowType = SMEM_ROW_KV;
pBuilder->compareStat = ROW_COMPARE_NO_NEED;
return TSDB_CODE_SUCCESS;
} else if (boundRatio > KVRatioData) {
pBuilder->memRowType = SMEM_ROW_DATA;
pBuilder->compareStat = ROW_COMPARE_NO_NEED;
return TSDB_CODE_SUCCESS;
}
pBuilder->compareStat = ROW_COMPARE_NEED;
......@@ -76,7 +74,6 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3
}
}
pBuilder->dataRowInitLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx);
if (nRows > 0) {
......@@ -86,7 +83,7 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3
}
for (int i = 0; i < nRows; ++i) {
(pBuilder->rowInfo + i)->dataLen = pBuilder->dataRowInitLen;
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
}
}
......@@ -117,7 +114,7 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
}
for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
if (isspace(pToken->z[k])) continue;
if (pToken->z[k] == ',') {
*next = pTokenEnd;
*time = useconds;
......@@ -460,7 +457,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
STableMeta * pTableMeta = pDataBlocks->pTableMeta;
SSchema * schema = tscGetTableSchema(pTableMeta);
SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder;
int32_t dataLen = pBuilder->dataRowInitLen;
int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t kvLen = pBuilder->kvRowInitLen;
bool isParseBindParam = false;
......@@ -809,13 +806,12 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
// allocate memory
size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
size_t nRealAlloc = nAlloc + 10 * sizeof(SBlockKeyTuple);
char * tmp = trealloc(pBlkKeyInfo->pKeyTuple, nRealAlloc);
char *tmp = trealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
if (tmp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp;
pBlkKeyInfo->maxBytesAlloc = (int32_t)nRealAlloc;
pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
}
memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);
......@@ -1593,7 +1589,8 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
ret = tsParseInsertSql(pSql);
if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) {
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, msg:%s, clear meta cache and retry ", pSql->self, pInsertParam->msg, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
pSql->parseRetry++;
......@@ -1697,7 +1694,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
SInsertStatementParam *pInsertParam = &pCmd->insertParam;
destroyTableNameList(pInsertParam);
pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks);
......@@ -1726,12 +1723,6 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error;
}
if (TSDB_CODE_SUCCESS !=
(ret = initMemRowBuilder(&pTableDataBlock->rowBuilder, 0, tinfo.numOfColumns, pTableDataBlock->numOfParams,
pTableDataBlock->boundColumnInfo.allNullLen))) {
goto _error;
}
while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
......@@ -1767,6 +1758,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
pSql->res.numOfRows = 0;
code = doPackSendDataBlock(pSql, pInsertParam, pTableMeta, count, pTableDataBlock);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
goto _error;
}
......@@ -1787,6 +1779,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
}
_error:
pParentSql->res.code = code;
tfree(tokenBuf);
tfree(line);
taos_free_result(pSql);
......
......@@ -86,6 +86,10 @@ typedef struct STscStmt {
return _code; \
} while (0)
#define STMT_CHECK if (pStmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) { \
STMT_RET(TSDB_CODE_TSC_DISCONNECTED); \
}
static int32_t invalidOperationMsg(char* dstBuffer, const char* errMsg) {
return tscInvalidOperationMsg(dstBuffer, errMsg, NULL);
}
......@@ -155,6 +159,22 @@ static int normalStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
var->i64 = *(int64_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UTINYINT:
var->u64 = *(uint8_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_USMALLINT:
var->u64 = *(uint16_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UINT:
var->u64 = *(uint32_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UBIGINT:
var->u64 = *(uint64_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_FLOAT:
var->dKey = GET_FLOAT_VAL(tb->buffer);
break;
......@@ -206,6 +226,8 @@ static int normalStmtPrepare(STscStmt* stmt) {
return code;
}
start = i + token.n;
} else if (token.type == TK_ILLEGAL) {
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "invalid sql");
}
i += token.n;
......@@ -259,9 +281,17 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
taosStringBuilderAppendInteger(&sb, var->i64);
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
taosStringBuilderAppendUnsignedInteger(&sb, var->u64);
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
taosStringBuilderAppendDouble(&sb, var->dKey);
......@@ -1492,6 +1522,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
pSql->isBind = true;
pStmt->pSql = pSql;
pStmt->last = STMT_INIT;
registerSqlObj(pSql);
return pStmt;
}
......@@ -1499,9 +1530,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (sql == NULL) {
tscError("sql is NULL");
......@@ -1547,8 +1576,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.insertParam.numOfParams = 0;
pSql->cmd.batchSize = 0;
registerSqlObj(pSql);
int32_t ret = stmtParseInsertTbTags(pSql, pStmt);
if (ret != TSDB_CODE_SUCCESS) {
STMT_RET(ret);
......@@ -1578,9 +1605,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
SSqlObj* pSql = pStmt->pSql;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1740,6 +1765,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
pStmt->mtb.subSet = true;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
......@@ -1748,6 +1774,7 @@ int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
pStmt->mtb.subSet = false;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
......@@ -1755,6 +1782,9 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_close(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
if (!pStmt->isInsert) {
SNormalStmt* normal = &pStmt->normal;
if (normal->params != NULL) {
......@@ -1776,8 +1806,9 @@ int taos_stmt_close(TAOS_STMT* stmt) {
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta);
if (pStmt->pSql){
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
}
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
taosArrayDestroy(pStmt->mtb.tags);
tfree(pStmt->mtb.sqlstr);
}
......@@ -1790,9 +1821,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->multiTbInsert) {
......@@ -1821,9 +1850,7 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX) {
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
......@@ -1854,9 +1881,7 @@ int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX || colIdx < 0) {
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
......@@ -1889,9 +1914,7 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in
int taos_stmt_add_batch(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->last != STMT_BIND && pStmt->last != STMT_BIND_COL) {
......@@ -1918,9 +1941,7 @@ int taos_stmt_reset(TAOS_STMT* stmt) {
int taos_stmt_execute(TAOS_STMT* stmt) {
int ret = 0;
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->last != STMT_ADD_BATCH) {
......@@ -1941,11 +1962,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
if (sql == NULL) {
ret = TSDB_CODE_TSC_OUT_OF_MEMORY;
} else {
if (pStmt->pSql != NULL) {
tscFreeSqlObj(pStmt->pSql);
pStmt->pSql = NULL;
}
taosReleaseRef(tscObjRef, pStmt->pSql->self);
pStmt->pSql = taos_query((TAOS*)pStmt->taos, sql);
ret = taos_errno(pStmt->pSql);
free(sql);
......@@ -1966,7 +1983,6 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) {
tscError("result has been used already.");
return NULL;
}
TAOS_RES* result = pStmt->pSql;
pStmt->pSql = NULL;
return result;
......@@ -1975,9 +1991,7 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) {
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (insert) *insert = pStmt->isInsert;
......@@ -1987,9 +2001,7 @@ int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
SSqlObj* pSql = pStmt->pSql;
......@@ -2006,9 +2018,7 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
SSqlCmd* pCmd = &pStmt->pSql->cmd;
......
此差异已折叠。
......@@ -337,16 +337,189 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
SRpcMsg* rpcMsg = pSchedMsg->ahandle;
SRpcEpSet* pEpSet = pSchedMsg->thandle;
//static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
// SRpcMsg* rpcMsg = pSchedMsg->ahandle;
// SRpcEpSet* pEpSet = pSchedMsg->thandle;
//
// TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
// SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
// if (pSql == NULL) {
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
// return;
// }
//
// assert(pSql->self == handle);
//
// STscObj *pObj = pSql->pTscObj;
// SSqlRes *pRes = &pSql->res;
// SSqlCmd *pCmd = &pSql->cmd;
//
// pSql->rpcRid = -1;
//
// if (pObj->signature != pObj) {
// tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
//
// taosRemoveRef(tscObjRef, handle);
// taosReleaseRef(tscObjRef, handle);
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
// return;
// }
//
// SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
// if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
// tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
// pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
//
// taosRemoveRef(tscObjRef, handle);
// taosReleaseRef(tscObjRef, handle);
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
// return;
// }
//
// if (pEpSet) {
// if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
// if (pCmd->command < TSDB_SQL_MGMT) {
// tscUpdateVgroupInfo(pSql, pEpSet);
// } else {
// tscUpdateMgmtEpSet(pSql, pEpSet);
// }
// }
// }
//
// int32_t cmd = pCmd->command;
//
// // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
// if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
// pSql->cmd.insertParam.schemaAttached = 1;
// }
//
// // single table query error need to be handled here.
// if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
// (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
// rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
//
// // 1. super table subquery
// // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
// if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
// TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
// !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
// (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
// // do nothing in case of super table subquery
// } else {
// pSql->retry += 1;
// tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
//
// pSql->res.code = rpcMsg->code; // keep the previous error code
// if (pSql->retry > pSql->maxRetry) {
// tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry);
// } else {
// // wait for a little bit moment and then retry
// // todo do not sleep in rpc callback thread, add this process into queue to process
// if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
// int32_t duration = getWaitingTimeInterval(pSql->retry);
// taosMsleep(duration);
// }
//
// pSql->retryReason = rpcMsg->code;
// rpcMsg->code = tscRenewTableMeta(pSql, 0);
// // if there is an error occurring, proceed to the following error handling procedure.
// if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
// taosReleaseRef(tscObjRef, handle);
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
// return;
// }
// }
// }
// }
//
// pRes->rspLen = 0;
//
// if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
// tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code));
// } else {
// pRes->code = rpcMsg->code;
// }
//
// if (pRes->code == TSDB_CODE_SUCCESS) {
// tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry);
// pSql->retry = 0;
// }
//
// if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
// assert(rpcMsg->msgType == pCmd->msgType + 1);
// pRes->code = rpcMsg->code;
// pRes->rspType = rpcMsg->msgType;
// pRes->rspLen = rpcMsg->contLen;
//
// if (pRes->rspLen > 0 && rpcMsg->pCont) {
// char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
// if (tmp == NULL) {
// pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
// } else {
// pRes->pRsp = tmp;
// memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
// }
// } else {
// tfree(pRes->pRsp);
// }
//
// /*
// * There is not response callback function for submit response.
// * The actual inserted number of points is the first number.
// */
// if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
// SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
// pMsg->code = htonl(pMsg->code);
// pMsg->numOfRows = htonl(pMsg->numOfRows);
// pMsg->affectedRows = htonl(pMsg->affectedRows);
// pMsg->failedRows = htonl(pMsg->failedRows);
// pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
//
// pRes->numOfRows += pMsg->affectedRows;
// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
// tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
// } else {
// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
// }
// }
//
// if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
// rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
// }
//
// bool shouldFree = tscShouldBeFreed(pSql);
// if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
// pRes->code = rpcMsg->code;
// }
// rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
// (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
// }
//
// if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
// tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self);
// taosRemoveRef(tscObjRef, handle);
// }
//
// taosReleaseRef(tscObjRef, handle);
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
//}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pSql == NULL) {
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
......@@ -357,15 +530,12 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
SSqlCmd *pCmd = &pSql->cmd;
pSql->rpcRid = -1;
if (pObj->signature != pObj) {
tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
......@@ -377,8 +547,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
......@@ -409,7 +577,8 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY))) {
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)
|| (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY))) {
// do nothing in case of super table subquery
} else {
pSql->retry += 1;
......@@ -432,8 +601,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
}
......@@ -511,29 +678,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doProcessMsgFromServer;
SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg));
memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg));
schedMsg.ahandle = (void*)rpcMsgCopy;
SRpcEpSet* pEpSetCopy = NULL;
if (pEpSet != NULL) {
pEpSetCopy = calloc(1, sizeof(SRpcEpSet));
memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet));
}
schedMsg.thandle = (void*)pEpSetCopy;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
int doBuildAndSendMsg(SSqlObj *pSql) {
......@@ -698,6 +842,11 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) {
tableSerialize = totalTables * sizeof(STableIdInfo);
}
SCond* pCond = &pQueryInfo->tagCond.tbnameCond;
if (pCond->len > 0) {
srcColListSize += pCond->len;
}
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + srcColFilterSize + srcTagFilterSize +
exprSize + tsBufSize + tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
}
......@@ -880,16 +1029,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryAttr query = {{0}};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
query.vgId = pTableMeta->vgId;
SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
......@@ -1383,7 +1532,6 @@ int32_t tscBuildSyncDbReplicaMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
}
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
pCmd->payloadLen = sizeof(SShowMsg) + 100;
......@@ -1406,9 +1554,9 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
if (tNameIsEmpty(&pTableMetaInfo->name)) {
pthread_mutex_lock(&pObj->mutex);
tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
pthread_mutex_unlock(&pObj->mutex);
char *p = cloneCurrentDBName(pSql);
tstrncpy(pShowMsg->db, p, sizeof(pShowMsg->db));
tfree(p);
} else {
tNameGetFullDbName(&pTableMetaInfo->name, pShowMsg->db);
}
......@@ -1744,7 +1892,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
return pRes->code;
}
tscSetResRawPtr(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
} else {
tscResetForNextRetrieve(pRes);
}
......@@ -2659,7 +2807,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE))) {
tscSetResRawPtr(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
}
if (pSql->pSubscription != NULL) {
......@@ -2824,7 +2972,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
pNew->self, numOfTable, numOfVgroupList, numOfUdf, pNew->cmd.payloadLen);
pNew->fp = fp;
pNew->param = (void *)pSql->self;
pNew->param = (void *)pSql->rootObj->self;
tscDebug("0x%"PRIx64" metaRid from 0x%" PRIx64 " to 0x%" PRIx64 , pSql->self, pSql->metaRid, pNew->self);
......@@ -2849,7 +2997,10 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity);
}
}
taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity);
if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) {
tfree(pTableMetaInfo->pTableMeta);
pTableMetaInfo->tableMetaCapacity = 0;
}
STableMeta* pMeta = pTableMetaInfo->pTableMeta;
if (pMeta && pMeta->id.uid > 0) {
......@@ -2860,6 +3011,8 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
}
}
tscDebug("0x%"PRIx64 " %s retrieve tableMeta from cache, numOfCols:%d, numOfTags:%d", pSql->self, name, pMeta->tableInfo.numOfColumns, pMeta->tableInfo.numOfTags);
return TSDB_CODE_SUCCESS;
}
......@@ -2967,6 +3120,12 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SSqlCmd* pCmd2 = &pSql->rootObj->cmd;
pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap);
pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pSql->rootObj->retryReason = pSql->retryReason;
SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
......
......@@ -874,6 +874,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->rootObj = pSql;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->resColumnId = TSDB_RES_COL_ID;
......@@ -982,6 +983,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->rootObj = pSql;
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str);
......
......@@ -679,6 +679,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->rootObj = pSql;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......
......@@ -127,6 +127,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->pSubscription = pSub;
pSql->rootObj = pSql;
pSub->pSql = pSql;
SSqlCmd* pCmd = &pSql->cmd;
......
......@@ -15,8 +15,9 @@
#define _GNU_SOURCE
#include "os.h"
#include "texpr.h"
#include "tsched.h"
#include "qTsbuf.h"
#include "tcompare.h"
#include "tscLog.h"
......@@ -859,6 +860,40 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
return true;
}
bool tscReparseSql(SSqlObj *sql, int32_t code){
if (!((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && sql->retry < sql->maxRetry)) {
return true;
}
tscFreeSubobj(sql);
tfree(sql->pSubs);
sql->res.code = TSDB_CODE_SUCCESS;
sql->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", sql->self,
tstrerror(code), sql->retry);
tscResetSqlCmd(&sql->cmd, true);
code = tsParseSql(sql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return false;
}
if (code != TSDB_CODE_SUCCESS) {
sql->res.code = code;
tscAsyncResultOnError(sql);
return false;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&sql->cmd);
executeQuery(sql, pQueryInfo);
return false;
}
static void setTidTagType(SJoinSupporter* p, uint8_t type) {
for (int32_t i = 0; i < p->num; ++i) {
STidTags * tag = (STidTags*) varDataVal(p->pIdTagList + i * p->tagSize);
......@@ -1101,6 +1136,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
......@@ -1118,6 +1157,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1255,6 +1298,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
......@@ -1271,6 +1318,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1400,6 +1451,10 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
......@@ -1727,6 +1782,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
// tscFieldInfoUpdateOffset(pQueryInfo);
}
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
......@@ -1746,6 +1802,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
......@@ -1761,6 +1821,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
......@@ -2034,17 +2098,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscAsyncResultOnError(pSql);
}
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL);
SRetrieveSupport* pSupport = pSub->param;
tfree(pSupport->localBuffer);
tfree(pSupport);
tscFreeRetrieveSup(pSub);
taos_free_result(pSub);
}
......@@ -2267,7 +2328,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
destroySup(pSup);
taos_free_result(pSql);
parent->res.code = code;
parent->res.code = c;
tscAsyncResultOnError(parent);
return;
}
......@@ -2397,6 +2458,10 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
} else {
SSchema ss = {.type = (uint8_t)pCol->info.type, .bytes = pCol->info.bytes, .colId = (int16_t)pCol->columnIndex};
tscColumnListInsert(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid, &ss);
int32_t ti = tscColumnExists(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid);
assert(ti >= 0);
SColumn* x = taosArrayGetP(pNewQueryInfo->colList, ti);
tscColumnCopy(x, pCol);
}
}
}
......@@ -2424,6 +2489,30 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
return terrno;
}
typedef struct SPair {
int32_t first;
int32_t second;
} SPair;
static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
SSqlObj* pSql = pSchedMsg->ahandle;
SPair* p = pSchedMsg->msg;
for (int32_t i = p->first; i < p->second; ++i) {
if (i >= pSql->subState.numOfSub) {
tfree(p);
return;
}
SSqlObj* pSub = pSql->pSubs[i];
SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
tscBuildAndSendRequest(pSub, NULL);
}
tfree(p);
}
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -2546,19 +2635,44 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
doCleanupSubqueries(pSql, i);
return pRes->code;
}
for(int32_t j = 0; j < pState->numOfSub; ++j) {
SSqlObj* pSub = pSql->pSubs[j];
SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
tscBuildAndSendRequest(pSub, NULL);
// concurrently sent the query requests.
const int32_t MAX_REQUEST_PER_TASK = 8;
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
assert(numOfTasks >= 1);
int32_t num;
if (pState->numOfSub / numOfTasks == MAX_REQUEST_PER_TASK) {
num = MAX_REQUEST_PER_TASK;
} else {
num = pState->numOfSub / numOfTasks + 1;
}
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doSendQueryReqs;
schedMsg.ahandle = (void*)pSql;
schedMsg.thandle = NULL;
SPair* p = calloc(1, sizeof(SPair));
p->first = j * num;
if (j == numOfTasks - 1) {
p->second = pState->numOfSub;
} else {
p->second = (j + 1) * num;
}
schedMsg.msg = p;
taosScheduleTask(tscQhandle, &schedMsg);
}
return TSDB_CODE_SUCCESS;
}
static void tscFreeRetrieveSup(SSqlObj *pSql) {
void tscFreeRetrieveSup(SSqlObj *pSql) {
SRetrieveSupport *trsupport = pSql->param;
void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
......@@ -2716,33 +2830,36 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
int32_t code = pParentSql->res.code;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
// remove the cached tableMeta and vgroup id list, and then parse the sql again
SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
SSqlObj *userSql = pParentSql->rootObj;
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) {
if (userSql != pParentSql) {
tscFreeRetrieveSup(pParentSql);
}
tscFreeSubobj(userSql);
tfree(userSql->pSubs);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
userSql->res.code = TSDB_CODE_SUCCESS;
userSql->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", userSql->self,
tstrerror(code), userSql->retry);
code = tsParseSql(pParentSql, true);
tscResetSqlCmd(&userSql->cmd, true);
code = tsParseSql(userSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
userSql->res.code = code;
tscAsyncResultOnError(userSql);
return;
}
executeQuery(pParentSql, pQueryInfo);
pQueryInfo = tscGetQueryInfo(&userSql->cmd);
executeQuery(userSql, pQueryInfo);
} else {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
}
......@@ -2812,7 +2929,6 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
pParentSql->self, pState->numOfSub, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
tscClearInterpInfo(pPQueryInfo);
code = tscCreateGlobalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, pPQueryInfo, &pParentSql->res.pMerger, pParentSql->self);
pParentSql->res.code = code;
......@@ -2927,7 +3043,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
pParentSql->self, pSql->self, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
if (num > tsMaxNumOfOrderedResults && /*tscIsProjectionQueryOnSTable(pQueryInfo, 0) &&*/ !(tscGetQueryInfo(&pParentSql->cmd)->distinct)) {
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfo(&pParentSql->cmd)->distinct)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
......@@ -3351,6 +3467,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
}
if (numOfRes == 0) { // no result any more, free all subquery objects
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
return;
}
......@@ -3397,6 +3514,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
memcpy(data, pData, bytes * numOfRes);
pRes->dataConverted = pRes1->dataConverted;
data += bytes * numOfRes;
}
......@@ -3422,7 +3541,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);
pRes->data = pFilePage->data;
tscSetResRawPtr(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
}
void tscBuildResFromSubqueries(SSqlObj *pSql) {
......
......@@ -29,6 +29,7 @@
#include "tsclient.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "httpInt.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
......@@ -60,6 +61,21 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
case TSDB_DATA_TYPE_TIMESTAMP:
n = sprintf(str, "%" PRId64, *(int64_t*)buf);
break;
case TSDB_DATA_TYPE_UTINYINT:
n = sprintf(str, "%d", *(uint8_t*)buf);
break;
case TSDB_DATA_TYPE_USMALLINT:
n = sprintf(str, "%d", *(uint16_t*)buf);
break;
case TSDB_DATA_TYPE_UINT:
n = sprintf(str, "%d", *(uint32_t*)buf);
break;
case TSDB_DATA_TYPE_UBIGINT:
n = sprintf(str, "%" PRId64, *(uint64_t*)buf);
break;
case TSDB_DATA_TYPE_FLOAT:
n = sprintf(str, "%f", GET_FLOAT_VAL(buf));
......@@ -369,6 +385,27 @@ bool tscGroupbyColumn(SQueryInfo* pQueryInfo) {
return false;
}
int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr == NULL) {
continue;
}
if (pExpr->base.functionId == TSDB_FUNC_TS) {
continue;
}
if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM) {
return i;
}
}
return -1;
}
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
......@@ -654,9 +691,13 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}
if (convertNchar) {
pRes->dataConverted = true;
}
}
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
assert(pRes->numOfCols > 0);
if (pRes->numOfRows == 0) {
return;
......@@ -669,7 +710,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, true);
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true);
}
}
......@@ -1208,7 +1249,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols);
pOutput->precision = pSqlObjList[0]->res.precision;
SSchema* schema = NULL;
......@@ -1504,7 +1544,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeSqlResult(pSql);
tscResetSqlCmd(pCmd, false);
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload);
pCmd->allocSize = 0;
......@@ -2074,6 +2113,22 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
return f;
}
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo) {
if (pQueryInfo->fieldsInfo.numOfOutput <= 0 || pQueryInfo->fieldsInfo.internalField == NULL) {
return 0;
}
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (!pField->visible) {
return i;
}
}
return pQueryInfo->fieldsInfo.numOfOutput;
}
SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
assert(pFieldInfo != NULL);
pFieldInfo->numOfOutput++;
......@@ -3043,6 +3098,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->window = TSWINDOW_INITIALIZER;
pQueryInfo->multigroupResult = true;
}
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
......@@ -3054,7 +3110,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
}
tscInitQueryInfo(pQueryInfo);
pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer
if (pCmd->pQueryInfo == NULL) {
......@@ -3136,6 +3191,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->window = pSrc->window;
pQueryInfo->sessionWindow = pSrc->sessionWindow;
pQueryInfo->pTableMetaInfo = NULL;
pQueryInfo->multigroupResult = pSrc->multigroupResult;
pQueryInfo->bufLen = pSrc->bufLen;
pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery;
......@@ -3373,6 +3429,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->row = 0;
pRes->numOfRows = 0;
pRes->dataConverted = false;
}
void tscInitResForMerge(SSqlRes* pRes) {
......@@ -3402,6 +3459,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->rootObj = pSql->rootObj;
SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd;
......@@ -3482,7 +3540,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->rootObj = pSql->rootObj;
tsem_init(&pNew->rspSem, 0, 0);
SSqlCmd* pnCmd = &pNew->cmd;
......@@ -3534,8 +3593,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult;
pNewQueryInfo->distinct = pQueryInfo->distinct;
if (pNewQueryInfo->buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
......@@ -3746,8 +3807,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
int32_t index = ps->subqueryIndex;
bool ret = subAndCheckDone(pSql, pParentSql, index);
tfree(ps);
pSql->param = NULL;
tscFreeRetrieveSup(pSql);
if (!ret) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index);
......@@ -3757,35 +3817,57 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
SSqlObj *rootObj = pParentSql->rootObj;
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && rootObj->retry < rootObj->maxRetry)) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
return;
}
tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs);
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
tscFreeSubobj(rootObj);
tfree(rootObj->pSubs);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
rootObj->res.code = TSDB_CODE_SUCCESS;
rootObj->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", rootObj->self,
tstrerror(code), rootObj->retry);
code = tsParseSql(pParentSql, true);
tscResetSqlCmd(&rootObj->cmd, true);
code = tsParseSql(rootObj, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
rootObj->res.code = code;
tscAsyncResultOnError(rootObj);
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd);
executeQuery(pParentSql, pQueryInfo);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&rootObj->cmd);
executeQuery(rootObj, pQueryInfo);
return;
}
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
SSqlObj* pParentSql = ps->pParentSql;
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
return;
}
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
}
......@@ -3804,9 +3886,11 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
}
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
assert(pSql->subState.numOfSub == 0);
pSql->subState.numOfSub = (int32_t) taosArrayGetSize(pQueryInfo->pUpstream);
assert(pSql->pSubs == NULL);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
assert(pSql->subState.states == NULL);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
code = pthread_mutex_init(&pSql->subState.mutex, NULL);
......@@ -3831,7 +3915,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->fp = tscSubqueryCompleteCallback;
pNew->maxRetry = pSql->maxRetry;
pNew->fetchFp = tscSubqueryCompleteCallback;
pNew->maxRetry = pSql->maxRetry;
pNew->rootObj = pSql->rootObj;
pNew->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pNew->rspSem, 0, 0);
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
......@@ -4008,6 +4097,31 @@ int32_t tscInvalidOperationMsg(char* msg, const char* additionalInfo, const char
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, const char* sql) {
const char* msgFormat1 = "%s:%s";
const char* msgFormat2 = "%s:\'%s\' (%s)";
const char* msgFormat3 = "%s:\'%s\'";
const int32_t BACKWARD_CHAR_STEP = 0;
if (sql == NULL) {
assert(errMsg != NULL);
sprintf(dstBuffer, msgFormat1, tstrerror(code), errMsg);
return code;
}
char buf[64] = {0}; // only extract part of sql string
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
if (errMsg != NULL) {
sprintf(dstBuffer, msgFormat2, tstrerror(code), buf, errMsg);
} else {
sprintf(dstBuffer, msgFormat3, tstrerror(code), buf); // no additional information for invalid sql error
}
return code;
}
bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0);
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
......@@ -4367,8 +4481,10 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
size_t sz = 0;
STableMeta* pChild = *ppChild;
STableMeta* pChild1;
taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz);
if(NULL == taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) {
tfree(p);
}
// tableMeta exists, build child table meta according to the super table meta
// the uid need to be checked in addition to the general name of the super table.
......@@ -4378,9 +4494,9 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
int32_t tableMetaSize = sizeof(STableMeta) + totalBytes;
if (*tableMetaCapacity < tableMetaSize) {
pChild1 = realloc(pChild, tableMetaSize);
if(pChild1 == NULL)
if(pChild1 == NULL)
return -1;
pChild = pChild1;
pChild = pChild1;
*tableMetaCapacity = (size_t)tableMetaSize;
}
......@@ -4652,6 +4768,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->distinct = pQueryInfo->distinct;
pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->multigroupResult = pQueryInfo->multigroupResult;
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
......@@ -4726,7 +4843,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
}
if (pQueryAttr->fillType != TSDB_FILL_NONE) {
pQueryAttr->fillVal = calloc(pQueryAttr->numOfOutput, sizeof(int64_t));
pQueryAttr->fillVal = calloc(pQueryInfo->numOfFillVal, sizeof(int64_t));
memcpy(pQueryAttr->fillVal, pQueryInfo->fillVal, pQueryInfo->numOfFillVal * sizeof(int64_t));
}
......@@ -4924,3 +5041,31 @@ void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) {
taosHashRemove(tscTableMetaMap, fname, len);
tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap));
}
char* cloneCurrentDBName(SSqlObj* pSql) {
char *p = NULL;
HttpContext *pCtx = NULL;
pthread_mutex_lock(&pSql->pTscObj->mutex);
STscObj *pTscObj = pSql->pTscObj;
switch (pTscObj->from) {
case TAOS_REQ_FROM_HTTP:
pCtx = pSql->param;
if (pCtx && pCtx->db[0] != '\0') {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN] = {0};
int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db);
assert(len <= sizeof(db));
p = strdup(db);
}
break;
default:
break;
}
if (p == NULL) {
p = strdup(pSql->pTscObj->db);
}
pthread_mutex_unlock(&pSql->pTscObj->mutex);
return p;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
Subproject commit a44ec1ca493ad01b2bf825b6418f69e11f548206
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit ce5201014136503d34fecbd56494b67b4961056c
Subproject commit b62a26ecc164a310104df57691691b237e091c89
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册