提交 9af5765a 编写于 作者: sangshuduo's avatar sangshuduo

merge with master branch.

......@@ -238,7 +238,7 @@ pipeline {
sh '''
cd ${WKC}/tests/examples/C#/taosdemo
mcs -out:taosdemo *.cs > /dev/null 2>&1
echo '' |./taosdemo
echo '' |./taosdemo -c /etc/taos
'''
sh '''
cd ${WKC}/tests/gotest
......@@ -256,21 +256,19 @@ pipeline {
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
}
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_crash_gen_val_log.sh
'''
// sh '''
// cd ${WKC}/tests/pytest
// rm -rf /var/lib/taos/*
// rm -rf /var/log/taos/*
// ./handle_crash_gen_val_log.sh
// '''
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
......
......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.1.7.1")
SET(TD_VER_NUMBER "2.2.0.1")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
Subproject commit ceda5bf9fcd7836509ac97dcc0056b3f1dd48cc5
Subproject commit 11c1060d4f917dd799ae628b131db5d6a5ef6954
......@@ -2,18 +2,18 @@
## <a class="anchor" id="intro"></a>TDengine 简介
TDengine是涛思数据面对高速增长的物联网大数据市场和技术挑战推出的创新性的大数据处理产品,它不依赖任何第三方软件,也不是优化或包装了一个开源的数据库或流式计算产品,而是在吸取众多传统关系型数据库、NoSQL数据库、流式计算引擎、消息队列等软件的优点之后自主开发的产品,在时序空间大数据处理上,有着自己独到的优势。
TDengine 是涛思数据面对高速增长的物联网大数据市场和技术挑战推出的创新性的大数据处理产品,它不依赖任何第三方软件,也不是优化或包装了一个开源的数据库或流式计算产品,而是在吸取众多传统关系型数据库、NoSQL 数据库、流式计算引擎、消息队列等软件的优点之后自主开发的产品,在时序空间大数据处理上,有着自己独到的优势。
TDengine的模块之一是时序数据库。但除此之外,为减少研发的复杂度、系统维护的难度,TDengine还提供缓存、消息队列、订阅、流式计算等功能,为物联网、工业互联网大数据的处理提供全栈的技术方案,是一个高效易用的物联网大数据平台。与Hadoop等典型的大数据平台相比,它具有如下鲜明的特点:
TDengine 的模块之一是时序数据库。但除此之外,为减少研发的复杂度、系统维护的难度,TDengine 还提供缓存、消息队列、订阅、流式计算等功能,为物联网、工业互联网大数据的处理提供全栈的技术方案,是一个高效易用的物联网大数据平台。与 Hadoop 等典型的大数据平台相比,它具有如下鲜明的特点:
* __10倍以上的性能提升__:定义了创新的数据存储结构,单核每秒能处理至少2万次请求,插入数百万个数据点,读出一千万以上数据点,比现有通用数据库快十倍以上。
* __硬件或云服务成本降至1/5__:由于超强性能,计算资源不到通用大数据方案的1/5;通过列式存储和先进的压缩算法,存储空间不到通用数据库的1/10。
* __全栈时序数据处理引擎__:将数据库、消息队列、缓存、流式计算等功能融为一体,应用无需再集成Kafka/Redis/HBase/Spark/HDFS等软件,大幅降低应用开发和维护的复杂度成本。
* __强大的分析功能__:无论是十年前还是一秒钟前的数据,指定时间范围即可查询。数据可在时间轴上或多个设备上进行聚合。即席查询可通过Shell, Python, R, MATLAB随时进行。
* __与第三方工具无缝连接__:不用一行代码,即可与Telegraf, Grafana, EMQ, HiveMQ, Prometheus, MATLAB, R等集成。后续将支持OPC, Hadoop, Spark等, BI工具也将无缝连接。
* __零运维成本、零学习成本__:安装集群简单快捷,无需分库分表,实时备份。类似标准SQL,支持RESTful, 支持Python/Java/C/C++/C#/Go/Node.js, 与MySQL相似,零学习成本。
* __10 倍以上的性能提升__:定义了创新的数据存储结构,单核每秒能处理至少 2 万次请求,插入数百万个数据点,读出一千万以上数据点,比现有通用数据库快十倍以上。
* __硬件或云服务成本降至 1/5__:由于超强性能,计算资源不到通用大数据方案的 1/5;通过列式存储和先进的压缩算法,存储空间不到通用数据库的 1/10。
* __全栈时序数据处理引擎__:将数据库、消息队列、缓存、流式计算等功能融为一体,应用无需再集成 Kafka/Redis/HBase/Spark/HDFS 等软件,大幅降低应用开发和维护的复杂度成本。
* __强大的分析功能__:无论是十年前还是一秒钟前的数据,指定时间范围即可查询。数据可在时间轴上或多个设备上进行聚合。即席查询可通过 Shell, Python, R, MATLAB 随时进行。
* __与第三方工具无缝连接__:不用一行代码,即可与 Telegraf, Grafana, EMQ, HiveMQ, Prometheus, MATLAB, R 等集成。后续将支持 OPC, Hadoop, Spark 等,BI 工具也将无缝连接。
* __零运维成本、零学习成本__:安装集群简单快捷,无需分库分表,实时备份。类标准 SQL,支持 RESTful,支持 Python/Java/C/C++/C#/Go/Node.js, 与 MySQL 相似,零学习成本。
采用TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。但需要指出的是,因充分利用了物联网时序数据的特点,它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM等通用型数据。
采用 TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。但需要指出的是,因充分利用了物联网时序数据的特点,它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM 等通用型数据。
![TDengine技术生态图](page://images/eco_system.png)
<center>图 1. TDengine技术生态图</center>
......@@ -21,42 +21,47 @@ TDengine的模块之一是时序数据库。但除此之外,为减少研发的
## <a class="anchor" id="scenes"></a>TDengine 总体适用场景
作为一个IOT大数据平台,TDengine的典型适用场景是在IOT范畴,而且用户有一定的数据量。本文后续的介绍主要针对这个范畴里面的系统。范畴之外的系统,比如CRM,ERP等,不在本文讨论范围内。
作为一个 IOT 大数据平台,TDengine 的典型适用场景是在 IOT 范畴,而且用户有一定的数据量。本文后续的介绍主要针对这个范畴里面的系统。范畴之外的系统,比如 CRM,ERP 等,不在本文讨论范围内。
### 数据源特点和需求
从数据源角度,设计人员可以从下面几个角度分析TDengine在目标应用系统里面的适用性。
从数据源角度,设计人员可以从下面几个角度分析 TDengine 在目标应用系统里面的适用性。
|数据源特点和需求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|总体数据量巨大| | | √ |TDengine在容量方面提供出色的水平扩展功能,并且具备匹配高压缩的存储结构,达到业界最优的存储效率。|
|数据输入速度偶尔或者持续巨大| | | √ | TDengine的性能大大超过同类产品,可以在同样的硬件环境下持续处理大量的输入数据,并且提供很容易在用户环境里面运行的性能评估工具。|
|数据源数目巨大| | | √ |TDengine设计中包含专门针对大量数据源的优化,包括数据的写入和查询,尤其适合高效处理海量(千万或者更多量级)的数据源。|
|总体数据量巨大| | | √ | TDengine 在容量方面提供出色的水平扩展功能,并且具备匹配高压缩的存储结构,达到业界最优的存储效率。|
|数据输入速度偶尔或者持续巨大| | | √ | TDengine 的性能大大超过同类产品,可以在同样的硬件环境下持续处理大量的输入数据,并且提供很容易在用户环境里面运行的性能评估工具。|
|数据源数目巨大| | | √ | TDengine 设计中包含专门针对大量数据源的优化,包括数据的写入和查询,尤其适合高效处理海量(千万或者更多量级)的数据源。|
### 系统架构要求
|系统架构要求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|要求简单可靠的系统架构| | | √ |TDengine的系统架构非常简单可靠,自带消息队列,缓存,流式计算,监控等功能,无需集成额外的第三方产品。|
|要求容错和高可靠| | | √ |TDengine的集群功能,自动提供容错灾备等高可靠功能。|
|标准化规范| | | √ |TDengine使用标准的SQL语言提供主要功能,遵守标准化规范。|
|要求简单可靠的系统架构| | | √ | TDengine 的系统架构非常简单可靠,自带消息队列,缓存,流式计算,监控等功能,无需集成额外的第三方产品。|
|要求容错和高可靠| | | √ | TDengine 的集群功能,自动提供容错灾备等高可靠功能。|
|标准化规范| | | √ | TDengine 使用标准的SQL语言提供主要功能,遵守标准化规范。|
### 系统功能需求
|系统功能需求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|要求完整的内置数据处理算法| | √ | |TDengine的实现了通用的数据处理算法,但是还没有做到妥善处理各行各业的所有要求,因此特殊类型的处理还需要应用层面处理。|
|需要大量的交叉查询处理| | √ | |这种类型的处理更多应该用关系型数据系统处理,或者应该考虑TDengine和关系型数据系统配合实现系统功能。|
|要求完整的内置数据处理算法| | √ | | TDengine 的实现了通用的数据处理算法,但是还没有做到妥善处理各行各业的所有要求,因此特殊类型的处理还需要应用层面处理。|
|需要大量的交叉查询处理| | √ | |这种类型的处理更多应该用关系型数据系统处理,或者应该考虑 TDengine 和关系型数据系统配合实现系统功能。|
### 系统性能需求
|系统性能需求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|要求较大的总体处理能力| | | √ |TDengine的集群功能可以轻松地让多服务器配合达成处理能力的提升。|
|要求高速处理数据 | | | √ |TDengine的专门为IOT优化的存储和数据处理的设计,一般可以让系统得到超出同类产品多倍数的处理速度提升。|
|要求快速处理小粒度数据| | | √ |这方面TDengine性能可以完全对标关系型和NoSQL型数据处理系统。|
|要求较大的总体处理能力| | | √ | TDengine 的集群功能可以轻松地让多服务器配合达成处理能力的提升。|
|要求高速处理数据 | | | √ | TDengine 的专门为 IOT 优化的存储和数据处理的设计,一般可以让系统得到超出同类产品多倍数的处理速度提升。|
|要求快速处理小粒度数据| | | √ |这方面 TDengine 性能可以完全对标关系型和 NoSQL 型数据处理系统。|
### 系统维护需求
|系统维护需求|不适用|可能适用|非常适用|简单说明|
|---|---|---|---|---|
|要求系统可靠运行| | | √ |TDengine的系统架构非常稳定可靠,日常维护也简单便捷,对维护人员的要求简洁明了,最大程度上杜绝人为错误和事故。|
|要求系统可靠运行| | | √ | TDengine 的系统架构非常稳定可靠,日常维护也简单便捷,对维护人员的要求简洁明了,最大程度上杜绝人为错误和事故。|
|要求运维学习成本可控| | | √ |同上。|
|要求市场有大量人才储备| √ | | |TDengine作为新一代产品,目前人才市场里面有经验的人员还有限。但是学习成本低,我们作为厂家也提供运维的培训和辅助服务。|
|要求市场有大量人才储备| √ | | | TDengine 作为新一代产品,目前人才市场里面有经验的人员还有限。但是学习成本低,我们作为厂家也提供运维的培训和辅助服务。|
......@@ -102,6 +102,12 @@ elif echo $osinfo | grep -qwi "centos" ; then
elif echo $osinfo | grep -qwi "fedora" ; then
# echo "This is fedora system"
os_type=2
elif echo $osinfo | grep -qwi "Linx" ; then
# echo "This is Linx system"
os_type=1
service_mod=0
initd_mod=0
service_config_dir="/etc/systemd/system"
else
echo " osinfo: ${osinfo}"
echo " This is an officially unverified linux system,"
......
......@@ -129,7 +129,11 @@ function install_lib() {
${csudo} ln -s ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
fi
if [ "$osType" != "Darwin" ]; then
${csudo} ldconfig
else
${csudo} update_dyld_shared_cache
fi
}
function install_header() {
......
......@@ -19,35 +19,35 @@ else
fi
# Dynamic directory
data_dir="/var/lib/taos"
if [ "$osType" != "Darwin" ]; then
data_dir="/var/lib/taos"
log_dir="/var/log/taos"
else
log_dir=~/TDengine/log
fi
data_link_dir="/usr/local/taos/data"
log_link_dir="/usr/local/taos/log"
cfg_install_dir="/etc/taos"
cfg_install_dir="/etc/taos"
if [ "$osType" != "Darwin" ]; then
bin_link_dir="/usr/bin"
lib_link_dir="/usr/lib"
lib64_link_dir="/usr/lib64"
inc_link_dir="/usr/include"
install_main_dir="/usr/local/taos"
bin_dir="/usr/local/taos/bin"
else
data_dir="/usr/local/var/lib/taos"
log_dir="/usr/local/var/log/taos"
cfg_install_dir="/usr/local/etc/taos"
bin_link_dir="/usr/local/bin"
lib_link_dir="/usr/local/lib"
inc_link_dir="/usr/local/include"
fi
#install main path
install_main_dir="/usr/local/taos"
install_main_dir="/usr/local/Cellar/tdengine/${verNumber}"
# old bin dir
bin_dir="/usr/local/taos/bin"
bin_dir="/usr/local/Cellar/tdengine/${verNumber}/bin"
fi
service_config_dir="/etc/systemd/system"
......@@ -59,12 +59,11 @@ GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
csudo=""
if command -v sudo > /dev/null; then
csudo="sudo"
fi
if [ "$osType" != "Darwin" ]; then
if command -v sudo > /dev/null; then
csudo="sudo"
fi
initd_mod=0
service_mod=2
if pidof systemd &> /dev/null; then
......@@ -138,15 +137,15 @@ function install_main_path() {
function install_bin() {
# Remove links
${csudo} rm -f ${bin_link_dir}/taos || :
if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${bin_link_dir}/set_core || :
fi
if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${bin_link_dir}/perfMonitor || :
${csudo} rm -f ${bin_link_dir}/set_core || :
${csudo} rm -f ${bin_link_dir}/rmtaos || :
fi
${csudo} cp -r ${binary_dir}/build/bin/* ${install_main_dir}/bin
${csudo} cp -r ${script_dir}/taosd-dump-cfg.gdb ${install_main_dir}/bin
......@@ -162,18 +161,17 @@ function install_bin() {
#Make link
[ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || :
if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/taosd ] && ${csudo} ln -s ${install_main_dir}/bin/taosd ${bin_link_dir}/taosd || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || :
if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/perfMonitor ] && ${csudo} ln -s ${install_main_dir}/bin/perfMonitor ${bin_link_dir}/perfMonitor || :
[ -x ${install_main_dir}/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
fi
if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/rmtaos || :
else
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || :
fi
}
......@@ -220,7 +218,7 @@ function install_jemalloc() {
fi
if [ -d /etc/ld.so.conf.d ]; then
${csudo} echo "/usr/local/lib" > /etc/ld.so.conf.d/jemalloc.conf
echo "/usr/local/lib" | ${csudo} tee /etc/ld.so.conf.d/jemalloc.conf
${csudo} ldconfig
else
echo "/etc/ld.so.conf.d not found!"
......@@ -245,8 +243,9 @@ function install_lib() {
${csudo} ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
fi
else
${csudo} cp -Rf ${binary_dir}/build/lib/libtaos.* ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} ln -sf ${install_main_dir}/driver/libtaos.1.dylib ${lib_link_dir}/libtaos.1.dylib
${csudo} cp -Rf ${binary_dir}/build/lib/libtaos.${verNumber}.dylib ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.1.dylib
${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
fi
......@@ -259,10 +258,14 @@ function install_lib() {
function install_header() {
if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
fi
${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/*
if [ "$osType" != "Darwin" ]; then
${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
fi
}
function install_config() {
......@@ -270,23 +273,20 @@ function install_config() {
if [ ! -f ${cfg_install_dir}/taos.cfg ]; then
${csudo} mkdir -p ${cfg_install_dir}
[ -f ${script_dir}/../cfg/taos.cfg ] && ${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir}
[ -f ${script_dir}/../cfg/taos.cfg ] &&
${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir}
${csudo} chmod 644 ${cfg_install_dir}/*
fi
${csudo} cp -f ${script_dir}/../cfg/taos.cfg ${install_main_dir}/cfg/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
if [ "$osType" != "Darwin" ]; then ${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
fi
}
function install_log() {
${csudo} rm -rf ${log_dir} || :
if [ "$osType" != "Darwin" ]; then
${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir}
else
mkdir -p ${log_dir} && chmod 777 ${log_dir}
fi
${csudo} ln -s ${log_dir} ${install_main_dir}/log
}
......@@ -307,7 +307,6 @@ function install_connector() {
echo "WARNING: go connector not found, please check if want to use it!"
fi
${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
}
......@@ -490,15 +489,12 @@ function install_TDengine() {
install_main_path
if [ "$osType" != "Darwin" ]; then
install_data
fi
install_log
install_header
install_lib
install_connector
install_examples
install_bin
if [ "$osType" != "Darwin" ]; then
......
name: tdengine
base: core18
version: '2.1.7.1'
version: '2.2.0.1'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.1.7.1
- usr/lib/libtaos.so.2.2.0.1
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -4,6 +4,8 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(jni)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/plugins/http/inc)
AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
......
......@@ -58,6 +58,7 @@ typedef struct SRetrieveSupport {
int32_t subqueryIndex; // index of current vnode in vnode list
struct SSqlObj *pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t localBufferSize;
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
......
......@@ -36,7 +36,7 @@ extern "C" {
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
......@@ -55,7 +55,7 @@ typedef struct STidTags {
#pragma pack(pop)
typedef struct SJoinSupporter {
SSqlObj* pObj; // parent SqlObj
int64_t pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
SInterval interval;
SLimitVal limit; // limit info
......@@ -190,6 +190,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
......@@ -284,7 +285,7 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
/**
* The create object function must be successful expect for the out of memory issue.
*
......@@ -360,6 +361,8 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql);
#ifdef __cplusplus
}
#endif
......
......@@ -38,6 +38,11 @@ extern "C" {
#include "qUtil.h"
#include "tcmdtype.h"
typedef enum {
TAOS_REQ_FROM_SHELL,
TAOS_REQ_FROM_HTTP
} SReqOrigin;
// forward declaration
struct SSqlInfo;
......@@ -307,6 +312,7 @@ typedef struct {
char * data;
TAOS_ROW tsrow;
TAOS_ROW urow;
bool dataConverted;
int32_t* length; // length for each field for current row
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex* pColumnIndex;
......@@ -340,6 +346,7 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet;
pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj
SReqOrigin from;
} STscObj;
typedef struct SSubqueryState {
......@@ -375,6 +382,7 @@ typedef struct SSqlObj {
SSubqueryState subState;
struct SSqlObj **pSubs;
struct SSqlObj *rootObj;
int64_t metaRid;
int64_t svgroupRid;
......@@ -439,7 +447,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
......@@ -486,6 +494,7 @@ bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, const char* sql);
int32_t tscInvalidOperationMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
......
此差异已折叠。
......@@ -44,6 +44,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = fp;
pSql->fetchFp = fp;
pSql->rootObj = pSql;
registerSqlObj(pSql);
......@@ -60,17 +61,23 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pCmd->resColumnId = TSDB_RES_COL_ID;
taosAcquireRef(tscObjRef, pSql->self);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
}
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscAsyncResultOnError(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo);
taosReleaseRef(tscObjRef, pSql->self);
}
// TODO return the correct error code to client in tscQueueAsyncError
......@@ -168,6 +175,9 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
} else {
pRes->code = numOfRows;
}
if (pRes->code == TSDB_CODE_SUCCESS) {
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
}
tscAsyncResultOnError(pSql);
return;
......@@ -358,15 +368,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { // stmt insert
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
(*pSql->fp)(pSql->param, pSql, code);
} else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert
tscImportDataFromFile(pSql);
......
......@@ -35,6 +35,7 @@ typedef struct SCompareParam {
static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
int32_t ret = 0;
size_t size = taosArrayGetSize(columnIndexList);
if (size > 0) {
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
......@@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
(*hasPrev) = true;
}
// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the
// output value to multiple rows
static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) {
if (numOfRows <= 1) {
return ;
return;
}
for (int32_t k = 0; k < numOfOutput; ++k) {
......@@ -574,31 +577,20 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
continue;
}
int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result
char* src = pCtx[k].pOutput;
char* dst = pCtx[k].pOutput + pCtx[k].outputBytes;
for (int32_t i = 0; i < inc; ++i) {
pCtx[k].pOutput += pCtx[k].outputBytes;
memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes);
// Let's start from the second row, as the first row has result value already.
for (int32_t i = 1; i < numOfRows; ++i) {
memcpy(dst, src, (size_t)pCtx[k].outputBytes);
dst += pCtx[k].outputBytes;
}
}
}
static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
SMultiwayMergeInfo* pInfo = pOperator->info;
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
add[i] = pCtx[i].pInput;
pCtx[i].size = 1;
}
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) {
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
......@@ -609,16 +601,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
} else {
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} else {
for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor
}
}
static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) {
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
......@@ -626,15 +617,30 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
} else {
aAggs[functionId].xFinalize(&pCtx[j]);
}
}
}
aAggs[functionId].xFinalize(&pCtx[j]);
static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
SMultiwayMergeInfo* pInfo = pOperator->info;
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
addrPtr[i] = pCtx[i].pInput;
pCtx[i].size = 1;
}
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
} else {
doFinalizeResultImpl(pInfo, pCtx, numOfExpr);
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
......@@ -643,7 +649,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) {
pCtx[j].ptsOutputBuf = pCtx[0].pOutput;
if(j>0) pCtx[j].ptsOutputBuf = pCtx[j-1].pOutput;
}
}
......@@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
}
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
}
} else {
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
}
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
......@@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
{
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].pInput = add[i];
pCtx[i].pInput = addrPtr[i];
}
}
tfree(add);
tfree(addrPtr);
}
static bool isAllSourcesCompleted(SGlobalMerger *pMerger) {
......@@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
bool sameGroup = true;
if (pInfo->hasPrev) {
// todo refactor extract method
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
// if this row belongs to current result set group
......@@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
break;
}
bool sameGroup = true;
if (pAggInfo->hasGroupColData) {
bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
if (!sameGroup) {
sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
if (!sameGroup && !pAggInfo->multiGroupResults) {
*newgroup = true;
pAggInfo->hasDataBlockForNewGroup = true;
pAggInfo->pExistBlock = pBlock;
......@@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
}
if (handleData) { // data in current group is all handled
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
int32_t functionId = pAggInfo->binfo.pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
}
doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
pAggInfo->binfo.pRes->info.rows += numOfRows;
pAggInfo->binfo.pRes->info.rows += numOfRows;
setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows);
}
......@@ -1019,71 +975,127 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
return (pRes->info.rows != 0)? pRes:NULL;
}
static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSLimitOperatorInfo *pInfo = pOperator->info;
assert(pInfo->currentGroupOffset >= 0);
static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) {
if (pInfo->currentOffset > 0) {
pInfo->currentOffset -= 1;
} else {
// discard the data rows in current group
if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) {
size_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock);
for (int32_t i = 0; i < num1; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
SColumnInfo *pColInfo = &pColInfoData->info;
char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes);
memcpy(pDst, pSrc, pColInfo->bytes);
}
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
pInfo->rowsTotal += 1;
pInfo->pRes->info.rows += 1;
}
}
}
static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) {
if (pInfo->capacity < pResultBlock->info.rows + numOfRows) {
int32_t total = pResultBlock->info.rows + numOfRows;
size_t num = taosArrayGetSize(pResultBlock->pDataBlock);
for (int32_t i = 0; i < num; ++i) {
SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i);
char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes);
if (tmp != NULL) {
pInfoData->pData = tmp;
} else {
// todo handle the malloc failure
}
return pBlock;
pInfo->capacity = total;
pInfo->threshold = (int64_t) (total * 0.8);
}
}
}
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
enum {
BLOCK_NEW_GROUP = 1,
BLOCK_NO_GROUP = 2,
BLOCK_SAME_GROUP = 3,
};
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
static int32_t doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) {
int32_t rowIndex = 0;
while(1) {
if (*newgroup) {
pInfo->currentGroupOffset -= 1;
*newgroup = false;
while (rowIndex < pBlock->info.rows) {
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
bool samegroup = true;
if (pInfo->hasPrev) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
SColumnInfo *pColInfo = &pColInfoData->info;
char *d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes);
if (ret != 0) { // it is a new group
samegroup = false;
break;
}
}
}
while ((*newgroup) == false) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (!samegroup || !pInfo->hasPrev) {
pInfo->ignoreCurrentGroup = false;
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev);
if (pBlock == NULL) {
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
pInfo->rowsTotal = 0;
if (pInfo->currentGroupOffset > 0) {
pInfo->ignoreCurrentGroup = true;
pInfo->currentGroupOffset -= 1; // now we are in the next group data
rowIndex += 1;
continue;
}
// A new group has arrived according to the result rows, and the group limitation has already reached.
// Let's jump out of current loop and return immediately.
if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
return BLOCK_NO_GROUP;
}
pInfo->groupTotal += 1;
// data in current group not allowed, return if current result does not belong to the previous group.And there
// are results exists in current SSDataBlock
if (!pInfo->multigroupResult && !samegroup && pInfo->pRes->info.rows > 0) {
return BLOCK_NEW_GROUP;
}
// now we have got the first data block of the next group.
if (pInfo->currentGroupOffset == 0) {
return pBlock;
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
} else { // handle the offset in the same group
// All the data in current group needs to be discarded, due to the limit parameter in the SQL statement
if (pInfo->ignoreCurrentGroup) {
rowIndex += 1;
continue;
}
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
}
rowIndex += 1;
}
return NULL;
return BLOCK_SAME_GROUP;
}
SSDataBlock* doSLimit(void* param, bool* newgroup) {
......@@ -1093,63 +1105,41 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
}
SSLimitOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
SSDataBlock *pBlock = NULL;
while (1) {
pBlock = skipGroupBlock(pOperator, newgroup);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
if (*newgroup) { // a new group arrives
pInfo->groupTotal += 1;
pInfo->rowsTotal = 0;
pInfo->currentOffset = pInfo->limit.offset;
}
assert(pInfo->currentGroupOffset == 0);
if (pInfo->pPrevBlock != NULL) {
ensureOutputBuf(pInfo, pInfo->pRes, pInfo->pPrevBlock->info.rows);
int32_t ret = doSlimitImpl(pOperator, pInfo, pInfo->pPrevBlock);
assert(ret != BLOCK_NEW_GROUP);
if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else {
if (pInfo->currentOffset == 0) {
break;
pInfo->pPrevBlock = NULL;
}
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
assert(pInfo->currentGroupOffset >= 0);
// move the remain rows of this data block to the front.
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
if (pBlock == NULL) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
pInfo->currentOffset = 0;
break;
}
ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows);
int32_t ret = doSlimitImpl(pOperator, pInfo, pBlock);
if (ret == BLOCK_NEW_GROUP) {
pInfo->pPrevBlock = pBlock;
return pInfo->pRes;
}
if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
return NULL;
if (pOperator->status == OP_EXEC_DONE) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) {
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal);
pInfo->rowsTotal = pInfo->limit.limit;
if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
pOperator->status = OP_EXEC_DONE;
// now the number of rows in current group is enough, let's return to the invoke function
if (pInfo->pRes->info.rows > pInfo->threshold) {
return pInfo->pRes;
}
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
} else {
pInfo->rowsTotal += pBlock->info.rows;
}
return pBlock;
}
......@@ -114,7 +114,7 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
}
for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
if (isspace(pToken->z[k])) continue;
if (pToken->z[k] == ',') {
*next = pTokenEnd;
*time = useconds;
......@@ -1589,7 +1589,8 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
ret = tsParseInsertSql(pSql);
if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) {
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, msg:%s, clear meta cache and retry ", pSql->self, pInsertParam->msg, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
pSql->parseRetry++;
......@@ -1757,6 +1758,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
pSql->res.numOfRows = 0;
code = doPackSendDataBlock(pSql, pInsertParam, pTableMeta, count, pTableDataBlock);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
goto _error;
}
......@@ -1777,6 +1779,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
}
_error:
pParentSql->res.code = code;
tfree(tokenBuf);
tfree(line);
taos_free_result(pSql);
......
......@@ -86,6 +86,10 @@ typedef struct STscStmt {
return _code; \
} while (0)
#define STMT_CHECK if (pStmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) { \
STMT_RET(TSDB_CODE_TSC_DISCONNECTED); \
}
static int32_t invalidOperationMsg(char* dstBuffer, const char* errMsg) {
return tscInvalidOperationMsg(dstBuffer, errMsg, NULL);
}
......@@ -155,6 +159,22 @@ static int normalStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
var->i64 = *(int64_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UTINYINT:
var->u64 = *(uint8_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_USMALLINT:
var->u64 = *(uint16_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UINT:
var->u64 = *(uint32_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UBIGINT:
var->u64 = *(uint64_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_FLOAT:
var->dKey = GET_FLOAT_VAL(tb->buffer);
break;
......@@ -206,6 +226,8 @@ static int normalStmtPrepare(STscStmt* stmt) {
return code;
}
start = i + token.n;
} else if (token.type == TK_ILLEGAL) {
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "invalid sql");
}
i += token.n;
......@@ -259,9 +281,17 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
taosStringBuilderAppendInteger(&sb, var->i64);
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
taosStringBuilderAppendUnsignedInteger(&sb, var->u64);
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
taosStringBuilderAppendDouble(&sb, var->dKey);
......@@ -1492,6 +1522,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
pSql->isBind = true;
pStmt->pSql = pSql;
pStmt->last = STMT_INIT;
registerSqlObj(pSql);
return pStmt;
}
......@@ -1499,9 +1530,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (sql == NULL) {
tscError("sql is NULL");
......@@ -1547,8 +1576,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.insertParam.numOfParams = 0;
pSql->cmd.batchSize = 0;
registerSqlObj(pSql);
int32_t ret = stmtParseInsertTbTags(pSql, pStmt);
if (ret != TSDB_CODE_SUCCESS) {
STMT_RET(ret);
......@@ -1578,9 +1605,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
SSqlObj* pSql = pStmt->pSql;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1740,6 +1765,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
pStmt->mtb.subSet = true;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
......@@ -1748,6 +1774,7 @@ int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
pStmt->mtb.subSet = false;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
......@@ -1755,6 +1782,9 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_close(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
if (!pStmt->isInsert) {
SNormalStmt* normal = &pStmt->normal;
if (normal->params != NULL) {
......@@ -1776,8 +1806,9 @@ int taos_stmt_close(TAOS_STMT* stmt) {
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta);
if (pStmt->pSql){
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
}
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
}
taosArrayDestroy(pStmt->mtb.tags);
tfree(pStmt->mtb.sqlstr);
}
......@@ -1790,9 +1821,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->multiTbInsert) {
......@@ -1821,9 +1850,7 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX) {
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
......@@ -1854,9 +1881,7 @@ int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX || colIdx < 0) {
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
......@@ -1889,9 +1914,7 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in
int taos_stmt_add_batch(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->last != STMT_BIND && pStmt->last != STMT_BIND_COL) {
......@@ -1918,9 +1941,7 @@ int taos_stmt_reset(TAOS_STMT* stmt) {
int taos_stmt_execute(TAOS_STMT* stmt) {
int ret = 0;
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->last != STMT_ADD_BATCH) {
......@@ -1941,11 +1962,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
if (sql == NULL) {
ret = TSDB_CODE_TSC_OUT_OF_MEMORY;
} else {
if (pStmt->pSql != NULL) {
tscFreeSqlObj(pStmt->pSql);
pStmt->pSql = NULL;
}
taosReleaseRef(tscObjRef, pStmt->pSql->self);
pStmt->pSql = taos_query((TAOS*)pStmt->taos, sql);
ret = taos_errno(pStmt->pSql);
free(sql);
......@@ -1966,7 +1983,6 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) {
tscError("result has been used already.");
return NULL;
}
TAOS_RES* result = pStmt->pSql;
pStmt->pSql = NULL;
return result;
......@@ -1975,9 +1991,7 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) {
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (insert) *insert = pStmt->isInsert;
......@@ -1987,9 +2001,7 @@ int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
SSqlObj* pSql = pStmt->pSql;
......@@ -2006,9 +2018,7 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
SSqlCmd* pCmd = &pStmt->pSql->cmd;
......
此差异已折叠。
......@@ -337,16 +337,189 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
SRpcMsg* rpcMsg = pSchedMsg->ahandle;
SRpcEpSet* pEpSet = pSchedMsg->thandle;
//static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
// SRpcMsg* rpcMsg = pSchedMsg->ahandle;
// SRpcEpSet* pEpSet = pSchedMsg->thandle;
//
// TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
// SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
// if (pSql == NULL) {
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
// return;
// }
//
// assert(pSql->self == handle);
//
// STscObj *pObj = pSql->pTscObj;
// SSqlRes *pRes = &pSql->res;
// SSqlCmd *pCmd = &pSql->cmd;
//
// pSql->rpcRid = -1;
//
// if (pObj->signature != pObj) {
// tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
//
// taosRemoveRef(tscObjRef, handle);
// taosReleaseRef(tscObjRef, handle);
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
// return;
// }
//
// SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
// if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
// tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
// pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
//
// taosRemoveRef(tscObjRef, handle);
// taosReleaseRef(tscObjRef, handle);
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
// return;
// }
//
// if (pEpSet) {
// if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
// if (pCmd->command < TSDB_SQL_MGMT) {
// tscUpdateVgroupInfo(pSql, pEpSet);
// } else {
// tscUpdateMgmtEpSet(pSql, pEpSet);
// }
// }
// }
//
// int32_t cmd = pCmd->command;
//
// // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
// if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
// pSql->cmd.insertParam.schemaAttached = 1;
// }
//
// // single table query error need to be handled here.
// if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
// (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
// rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
//
// // 1. super table subquery
// // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
// if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
// TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
// !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
// (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
// // do nothing in case of super table subquery
// } else {
// pSql->retry += 1;
// tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
//
// pSql->res.code = rpcMsg->code; // keep the previous error code
// if (pSql->retry > pSql->maxRetry) {
// tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry);
// } else {
// // wait for a little bit moment and then retry
// // todo do not sleep in rpc callback thread, add this process into queue to process
// if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
// int32_t duration = getWaitingTimeInterval(pSql->retry);
// taosMsleep(duration);
// }
//
// pSql->retryReason = rpcMsg->code;
// rpcMsg->code = tscRenewTableMeta(pSql, 0);
// // if there is an error occurring, proceed to the following error handling procedure.
// if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
// taosReleaseRef(tscObjRef, handle);
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
// return;
// }
// }
// }
// }
//
// pRes->rspLen = 0;
//
// if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
// tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code));
// } else {
// pRes->code = rpcMsg->code;
// }
//
// if (pRes->code == TSDB_CODE_SUCCESS) {
// tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry);
// pSql->retry = 0;
// }
//
// if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
// assert(rpcMsg->msgType == pCmd->msgType + 1);
// pRes->code = rpcMsg->code;
// pRes->rspType = rpcMsg->msgType;
// pRes->rspLen = rpcMsg->contLen;
//
// if (pRes->rspLen > 0 && rpcMsg->pCont) {
// char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
// if (tmp == NULL) {
// pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
// } else {
// pRes->pRsp = tmp;
// memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
// }
// } else {
// tfree(pRes->pRsp);
// }
//
// /*
// * There is not response callback function for submit response.
// * The actual inserted number of points is the first number.
// */
// if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
// SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
// pMsg->code = htonl(pMsg->code);
// pMsg->numOfRows = htonl(pMsg->numOfRows);
// pMsg->affectedRows = htonl(pMsg->affectedRows);
// pMsg->failedRows = htonl(pMsg->failedRows);
// pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
//
// pRes->numOfRows += pMsg->affectedRows;
// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
// tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
// } else {
// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
// }
// }
//
// if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
// rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
// }
//
// bool shouldFree = tscShouldBeFreed(pSql);
// if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
// pRes->code = rpcMsg->code;
// }
// rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
// (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
// }
//
// if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
// tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self);
// taosRemoveRef(tscObjRef, handle);
// }
//
// taosReleaseRef(tscObjRef, handle);
// rpcFreeCont(rpcMsg->pCont);
// free(rpcMsg);
// free(pEpSet);
//}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pSql == NULL) {
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
......@@ -357,15 +530,12 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
SSqlCmd *pCmd = &pSql->cmd;
pSql->rpcRid = -1;
if (pObj->signature != pObj) {
tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
......@@ -377,8 +547,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
......@@ -409,7 +577,8 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)
|| (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY))) {
// do nothing in case of super table subquery
} else {
pSql->retry += 1;
......@@ -432,8 +601,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
}
......@@ -511,29 +678,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doProcessMsgFromServer;
SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg));
memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg));
schedMsg.ahandle = (void*)rpcMsgCopy;
SRpcEpSet* pEpSetCopy = NULL;
if (pEpSet != NULL) {
pEpSetCopy = calloc(1, sizeof(SRpcEpSet));
memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet));
}
schedMsg.thandle = (void*)pEpSetCopy;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
int doBuildAndSendMsg(SSqlObj *pSql) {
......@@ -698,6 +842,11 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) {
tableSerialize = totalTables * sizeof(STableIdInfo);
}
SCond* pCond = &pQueryInfo->tagCond.tbnameCond;
if (pCond->len > 0) {
srcColListSize += pCond->len;
}
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + srcColFilterSize + srcTagFilterSize +
exprSize + tsBufSize + tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
}
......@@ -852,8 +1001,8 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
(*pMsg) += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType);
pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen);
pSqlExpr->param[j].nType = htonl(pExpr->param[j].nType);
pSqlExpr->param[j].nLen = htonl(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy((*pMsg), pExpr->param[j].pz, pExpr->param[j].nLen);
......@@ -958,6 +1107,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tableCols[i].bytes = htons(pCol->bytes);
pQueryMsg->tableCols[i].type = htons(pCol->type);
pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters);
pQueryMsg->tableCols[i].flist.filterInfo = 0;
// append the filter information after the basic column information
serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg);
......@@ -1040,6 +1190,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += pCond->len;
}
} else {
pQueryMsg->tagCondLen = 0;
}
if (pQueryInfo->bufLen > 0) {
......@@ -1069,6 +1221,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen);
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
} else {
pQueryMsg->tsBuf.tsLen = 0;
pQueryMsg->tsBuf.tsNumOfBlocks = 0;
}
int32_t numOfOperator = (int32_t) taosArrayGetSize(queryOperator);
......@@ -1106,6 +1261,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += pUdfInfo->contLen;
}
} else {
pQueryMsg->udfContentOffset = 0;
}
memcpy(pMsg, pSql->sqlstr, sqlLen);
......@@ -1383,7 +1540,6 @@ int32_t tscBuildSyncDbReplicaMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
}
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
pCmd->payloadLen = sizeof(SShowMsg) + 100;
......@@ -1406,9 +1562,9 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
if (tNameIsEmpty(&pTableMetaInfo->name)) {
pthread_mutex_lock(&pObj->mutex);
tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
pthread_mutex_unlock(&pObj->mutex);
char *p = cloneCurrentDBName(pSql);
tstrncpy(pShowMsg->db, p, sizeof(pShowMsg->db));
tfree(p);
} else {
tNameGetFullDbName(&pTableMetaInfo->name, pShowMsg->db);
}
......@@ -1744,7 +1900,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
return pRes->code;
}
tscSetResRawPtr(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
} else {
tscResetForNextRetrieve(pRes);
}
......@@ -2149,7 +2305,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
pVgroup->vgId = vmsg->vgId;
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
pVgroup->epAddr[k].port = vmsg->epAddr[k].port;
pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
tstrncpy(pVgroup->epAddr[k].fqdn, vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
}
doUpdateVgroupInfo(pVgroup->vgId, vmsg);
......@@ -2207,6 +2363,7 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
parQueryInfo->pUdfInfo = pQueryInfo->pUdfInfo; // assigned to parent sql obj.
pQueryInfo->pUdfInfo = NULL;
taosReleaseRef(tscObjRef, parent->self);
return TSDB_CODE_SUCCESS;
}
......@@ -2659,7 +2816,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE))) {
tscSetResRawPtr(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
}
if (pSql->pSubscription != NULL) {
......@@ -2824,7 +2981,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
pNew->self, numOfTable, numOfVgroupList, numOfUdf, pNew->cmd.payloadLen);
pNew->fp = fp;
pNew->param = (void *)pSql->self;
pNew->param = (void *)pSql->rootObj->self;
tscDebug("0x%"PRIx64" metaRid from 0x%" PRIx64 " to 0x%" PRIx64 , pSql->self, pSql->metaRid, pNew->self);
......@@ -2849,17 +3006,23 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity);
}
}
taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity);
if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) {
tfree(pTableMetaInfo->pTableMeta);
pTableMetaInfo->tableMetaCapacity = 0;
}
STableMeta* pMeta = pTableMetaInfo->pTableMeta;
if (pMeta && pMeta->id.uid > 0) {
// in case of child table, here only get the
if (pMeta->tableType == TSDB_CHILD_TABLE) {
int32_t code = tscCreateTableMetaFromSTableMeta(&pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity);
pMeta = pTableMetaInfo->pTableMeta;
if (code != TSDB_CODE_SUCCESS) {
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
}
}
tscDebug("0x%"PRIx64 " %s retrieve tableMeta from cache, numOfCols:%d, numOfTags:%d", pSql->self, name, pMeta->tableInfo.numOfColumns, pMeta->tableInfo.numOfTags);
return TSDB_CODE_SUCCESS;
}
......@@ -2967,6 +3130,12 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SSqlCmd* pCmd2 = &pSql->rootObj->cmd;
pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap);
pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pSql->rootObj->retryReason = pSql->retryReason;
SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
......
......@@ -874,6 +874,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->rootObj = pSql;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->resColumnId = TSDB_RES_COL_ID;
......@@ -982,6 +983,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->rootObj = pSql;
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str);
......
......@@ -679,6 +679,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->rootObj = pSql;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......
......@@ -127,6 +127,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->pSubscription = pSub;
pSql->rootObj = pSql;
pSub->pSql = pSql;
SSqlCmd* pCmd = &pSql->cmd;
......
此差异已折叠。
......@@ -123,6 +123,10 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
void taos_init_imp(void) {
char temp[128] = {0};
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
errno = TSDB_CODE_SUCCESS;
srand(taosGetTimestampSec());
deltaToUtcInitOnce();
......@@ -197,10 +201,6 @@ void taos_init_imp(void) {
tscRefId = taosOpenRef(200, tscCloseTscObj);
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
tscDebug("client is initialized successfully");
}
......
......@@ -29,6 +29,7 @@
#include "tsclient.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "httpInt.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
......@@ -60,6 +61,21 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
case TSDB_DATA_TYPE_TIMESTAMP:
n = sprintf(str, "%" PRId64, *(int64_t*)buf);
break;
case TSDB_DATA_TYPE_UTINYINT:
n = sprintf(str, "%d", *(uint8_t*)buf);
break;
case TSDB_DATA_TYPE_USMALLINT:
n = sprintf(str, "%d", *(uint16_t*)buf);
break;
case TSDB_DATA_TYPE_UINT:
n = sprintf(str, "%d", *(uint32_t*)buf);
break;
case TSDB_DATA_TYPE_UBIGINT:
n = sprintf(str, "%" PRId64, *(uint64_t*)buf);
break;
case TSDB_DATA_TYPE_FLOAT:
n = sprintf(str, "%f", GET_FLOAT_VAL(buf));
......@@ -675,9 +691,13 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}
if (convertNchar) {
pRes->dataConverted = true;
}
}
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
assert(pRes->numOfCols > 0);
if (pRes->numOfRows == 0) {
return;
......@@ -690,7 +710,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, true);
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true);
}
}
......@@ -1496,6 +1516,8 @@ void tscFreeSqlObj(SSqlObj* pSql) {
return;
}
int64_t sid = pSql->self;
tscDebug("0x%"PRIx64" start to free sqlObj", pSql->self);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
......@@ -1527,6 +1549,8 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tfree(pCmd->payload);
pCmd->allocSize = 0;
tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql);
tsem_destroy(&pSql->rspSem);
memset(pSql, 0, sizeof(*pSql));
free(pSql);
......@@ -2093,6 +2117,22 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
return f;
}
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo) {
if (pQueryInfo->fieldsInfo.numOfOutput <= 0 || pQueryInfo->fieldsInfo.internalField == NULL) {
return 0;
}
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (!pField->visible) {
return i;
}
}
return pQueryInfo->fieldsInfo.numOfOutput;
}
SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
assert(pFieldInfo != NULL);
pFieldInfo->numOfOutput++;
......@@ -3062,6 +3102,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->window = TSWINDOW_INITIALIZER;
pQueryInfo->multigroupResult = true;
}
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
......@@ -3073,7 +3114,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
}
tscInitQueryInfo(pQueryInfo);
pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer
if (pCmd->pQueryInfo == NULL) {
......@@ -3155,6 +3195,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->window = pSrc->window;
pQueryInfo->sessionWindow = pSrc->sessionWindow;
pQueryInfo->pTableMetaInfo = NULL;
pQueryInfo->multigroupResult = pSrc->multigroupResult;
pQueryInfo->bufLen = pSrc->bufLen;
pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery;
......@@ -3241,11 +3282,6 @@ void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
tfree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList);
}
......@@ -3259,10 +3295,6 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
assert(size > index);
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
tfree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList);
taosArrayRemove(pVgroupTable, index);
}
......@@ -3271,10 +3303,6 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
memset(info, 0, sizeof(SVgroupTableInfo));
info->vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
if (pInfo->itemList) {
info->itemList = taosArrayDup(pInfo->itemList);
}
......@@ -3392,6 +3420,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->row = 0;
pRes->numOfRows = 0;
pRes->dataConverted = false;
}
void tscInitResForMerge(SSqlRes* pRes) {
......@@ -3421,6 +3450,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->rootObj = pSql->rootObj;
SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd;
......@@ -3502,6 +3532,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->rootObj = pSql->rootObj;
tsem_init(&pNew->rspSem, 0, 0);
SSqlCmd* pnCmd = &pNew->cmd;
......@@ -3554,7 +3585,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult;
pNewQueryInfo->distinct = pQueryInfo->distinct;
if (pNewQueryInfo->buf == NULL) {
......@@ -3777,7 +3808,11 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) {
SSqlObj *rootObj = pParentSql->rootObj;
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && rootObj->retry < rootObj->maxRetry)) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -3785,32 +3820,45 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
tscFreeSubobj(rootObj);
tfree(rootObj->pSubs);
rootObj->res.code = TSDB_CODE_SUCCESS;
rootObj->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", rootObj->self,
tstrerror(code), rootObj->retry);
tscResetSqlCmd(&pParentSql->cmd, true);
tscResetSqlCmd(&rootObj->cmd, true);
code = tsParseSql(pParentSql, true);
code = tsParseSql(rootObj, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
rootObj->res.code = code;
tscAsyncResultOnError(rootObj);
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&rootObj->cmd);
executeQuery(rootObj, pQueryInfo);
return;
}
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
SSqlObj* pParentSql = ps->pParentSql;
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
executeQuery(pParentSql, pQueryInfo);
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
return;
}
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
}
......@@ -3858,7 +3906,9 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->fp = tscSubqueryCompleteCallback;
pNew->fetchFp = tscSubqueryCompleteCallback;
pNew->maxRetry = pSql->maxRetry;
pNew->rootObj = pSql->rootObj;
pNew->cmd.resColumnId = TSDB_RES_COL_ID;
......@@ -4038,6 +4088,31 @@ int32_t tscInvalidOperationMsg(char* msg, const char* additionalInfo, const char
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, const char* sql) {
const char* msgFormat1 = "%s:%s";
const char* msgFormat2 = "%s:\'%s\' (%s)";
const char* msgFormat3 = "%s:\'%s\'";
const int32_t BACKWARD_CHAR_STEP = 0;
if (sql == NULL) {
assert(errMsg != NULL);
sprintf(dstBuffer, msgFormat1, tstrerror(code), errMsg);
return code;
}
char buf[64] = {0}; // only extract part of sql string
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
if (errMsg != NULL) {
sprintf(dstBuffer, msgFormat2, tstrerror(code), buf, errMsg);
} else {
sprintf(dstBuffer, msgFormat3, tstrerror(code), buf); // no additional information for invalid sql error
}
return code;
}
bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0);
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
......@@ -4266,7 +4341,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
pNewVInfo->numOfEps = pvInfo->numOfEps;
for(int32_t j = 0; j < pvInfo->numOfEps; ++j) {
pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn);
tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN);
pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port;
}
}
......@@ -4279,34 +4354,10 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
return NULL;
}
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
tfree(pVgroupInfo->epAddr[j].fqdn);
}
for(int32_t j = pVgroupInfo->numOfEps; j < TSDB_MAX_REPLICA; j++) {
assert( pVgroupInfo->epAddr[j].fqdn == NULL );
}
}
tfree(vgroupList);
return NULL;
}
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) {
dst->vgId = src->vgId;
dst->numOfEps = src->numOfEps;
for(int32_t i = 0; i < dst->numOfEps; ++i) {
tfree(dst->epAddr[i].fqdn);
dst->epAddr[i].port = src->epAddr[i].port;
assert(dst->epAddr[i].fqdn == NULL);
dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn);
}
}
char* serializeTagData(STagData* pTagData, char* pMsg) {
int32_t n = (int32_t) strlen(pTagData->name);
*(int32_t*) pMsg = htonl(n);
......@@ -4398,7 +4449,9 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
STableMeta* pChild = *ppChild;
STableMeta* pChild1;
taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz);
if(NULL == taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) {
tfree(p);
}
// tableMeta exists, build child table meta according to the super table meta
// the uid need to be checked in addition to the general name of the super table.
......@@ -4449,7 +4502,7 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) {
SVgroupsInfo* pInfo = calloc(1, size);
pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups;
for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) {
tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]);
memcpy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m], sizeof(SVgroupMsg));
}
return pInfo;
}
......@@ -4682,6 +4735,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->distinct = pQueryInfo->distinct;
pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->multigroupResult = pQueryInfo->multigroupResult;
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
......@@ -4756,7 +4810,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
}
if (pQueryAttr->fillType != TSDB_FILL_NONE) {
pQueryAttr->fillVal = calloc(pQueryAttr->numOfOutput, sizeof(int64_t));
pQueryAttr->fillVal = calloc(pQueryInfo->numOfFillVal, sizeof(int64_t));
memcpy(pQueryAttr->fillVal, pQueryInfo->fillVal, pQueryInfo->numOfFillVal * sizeof(int64_t));
}
......@@ -4954,3 +5008,31 @@ void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) {
taosHashRemove(tscTableMetaMap, fname, len);
tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap));
}
char* cloneCurrentDBName(SSqlObj* pSql) {
char *p = NULL;
HttpContext *pCtx = NULL;
pthread_mutex_lock(&pSql->pTscObj->mutex);
STscObj *pTscObj = pSql->pTscObj;
switch (pTscObj->from) {
case TAOS_REQ_FROM_HTTP:
pCtx = pSql->param;
if (pCtx && pCtx->db[0] != '\0') {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN] = {0};
int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db);
assert(len <= sizeof(db));
p = strdup(db);
}
break;
default:
break;
}
if (p == NULL) {
p = strdup(pSql->pTscObj->db);
}
pthread_mutex_unlock(&pSql->pTscObj->mutex);
return p;
}
......@@ -125,6 +125,7 @@ extern int32_t tsHttpMaxThreads;
extern int8_t tsHttpEnableCompress;
extern int8_t tsHttpEnableRecordSql;
extern int8_t tsTelegrafUseFieldNum;
extern int8_t tsHttpDbNameMandatory;
// mqtt
extern int8_t tsEnableMqttModule;
......
......@@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
int dcol = 0;
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
......@@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
if(forceSetNull) {
if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
dcol++;
......@@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
......@@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
if (forceSetNull) {
if (forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
++dcol;
......@@ -533,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) {
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints);
}
......@@ -577,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) {
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
}
......@@ -595,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
} else if(target->cols[i].len > 0) {
dataColSetNullAt(&target->cols[i], target->numOfRows);
}
}
target->numOfRows++;
......
......@@ -84,7 +84,7 @@ int8_t tsTscEnableRecordSql = 0;
// the maximum number of results for projection query on super table that are returned from
// one virtual node, to order according to timestamp
int32_t tsMaxNumOfOrderedResults = 100000;
int32_t tsMaxNumOfOrderedResults = 1000000;
// 10 ms for sliding time, the value will changed in case of time precision changed
int32_t tsMinSlidingTime = 10;
......@@ -165,6 +165,7 @@ int32_t tsHttpMaxThreads = 2;
int8_t tsHttpEnableCompress = 1;
int8_t tsHttpEnableRecordSql = 0;
int8_t tsTelegrafUseFieldNum = 0;
int8_t tsHttpDbNameMandatory = 0;
// mqtt
int8_t tsEnableMqttModule = 0; // not finished yet, not started it by default
......@@ -1017,8 +1018,8 @@ static void doInitGlobalConfig(void) {
cfg.ptr = &tsMaxNumOfOrderedResults;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MAX_SQL_LEN;
cfg.maxValue = TSDB_MAX_ALLOWED_SQL_LEN;
cfg.minValue = 100000;
cfg.maxValue = 100000000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
......@@ -1217,10 +1218,10 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "topicBianryLen";
cfg.option = "topicBinaryLen";
cfg.ptr = &tsTopicBianryLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 16;
cfg.maxValue = 16000;
cfg.ptrLength = 0;
......@@ -1267,6 +1268,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "httpDbNameMandatory";
cfg.ptr = &tsHttpDbNameMandatory;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// debug flag
cfg.option = "numOfLogLines";
cfg.ptr = &tsNumOfLogLines;
......@@ -1570,7 +1581,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum <= TSDB_CFG_MAX_NUM);
#ifdef TD_TSZ
// lossy compress
cfg.option = "lossyColumns";
......@@ -1591,8 +1601,6 @@ static void doInitGlobalConfig(void) {
cfg.maxValue = MAX_FLOAT;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "dPrecision";
......@@ -1624,6 +1632,9 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM);
#else
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
#endif
}
......
......@@ -31,12 +31,12 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) {
switch (token->type) {
case TSDB_DATA_TYPE_BOOL: {
int32_t k = strncasecmp(token->z, "true", 4);
if (k == 0) {
if (strncasecmp(token->z, "true", 4) == 0) {
pVar->i64 = TSDB_TRUE;
} else {
assert(strncasecmp(token->z, "false", 5) == 0);
} else if (strncasecmp(token->z, "false", 5) == 0) {
pVar->i64 = TSDB_FALSE;
} else {
return;
}
break;
......
......@@ -117,7 +117,6 @@
<exclude>**/DatetimeBefore1970Test.java</exclude>
<exclude>**/FailOverTest.java</exclude>
<exclude>**/InvalidResultSetPointerTest.java</exclude>
<exclude>**/RestfulConnectionTest.java</exclude>
<exclude>**/TSDBJNIConnectorTest.java</exclude>
<exclude>**/TaosInfoMonitorTest.java</exclude>
<exclude>**/UnsignedNumberJniTest.java</exclude>
......
......@@ -40,13 +40,13 @@ public class TSDBError {
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_SUBSCRIBE_FAILED, "failed to create subscription");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_UNSUPPORTED_ENCODING, "Unsupported encoding");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_TDENGINE_ERROR, "internal error of database");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_TDENGINE_ERROR, "internal error of database, please see taoslog for more details");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL, "JNI connection is NULL");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_RESULT_SET_NULL, "JNI result set is NULL");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_NUM_OF_FIELDS_0, "invalid num of fields");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_SQL_NULL, "empty sql string");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_FETCH_END, "fetch to the end of resultSet");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_OUT_OF_MEMORY, "JNI alloc memory failed");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_JNI_OUT_OF_MEMORY, "JNI alloc memory failed, please see taoslog for more details");
}
public static SQLException createSQLException(int errorCode) {
......
......@@ -278,25 +278,20 @@ public class TSDBJNIConnector {
private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes);
public long prepareStmt(String sql) throws SQLException {
long stmt;
try {
stmt = prepareStmtImp(sql.getBytes(), this.taos);
} catch (Exception e) {
e.printStackTrace();
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_ENCODING);
}
long stmt = prepareStmtImp(sql.getBytes(), this.taos);
if (stmt == TSDBConstants.JNI_CONNECTION_NULL) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL, "connection already closed");
}
if (stmt == TSDBConstants.JNI_SQL_NULL) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_SQL_NULL);
}
if (stmt == TSDBConstants.JNI_OUT_OF_MEMORY) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_OUT_OF_MEMORY);
}
if (stmt == TSDBConstants.JNI_TDENGINE_ERROR) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_TDENGINE_ERROR);
}
return stmt;
}
......@@ -313,8 +308,7 @@ public class TSDBJNIConnector {
private native int setBindTableNameImp(long stmt, String name, long conn);
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(),
nullList.array(), this.taos);
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(), nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
}
......
......@@ -32,6 +32,7 @@ import java.util.List;
import com.taosdata.jdbc.utils.NullType;
public class TSDBResultSetBlockData {
private static final int BINARY_LENGTH_OFFSET = 2;
private int numOfRows = 0;
private int rowIndex = 0;
......@@ -404,10 +405,8 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
bb.position(fieldSize * this.rowIndex);
bb.position((fieldSize + BINARY_LENGTH_OFFSET) * this.rowIndex);
int length = bb.getShort();
byte[] dest = new byte[length];
bb.get(dest, 0, length);
if (NullType.isBinaryNull(dest, length)) {
......@@ -419,16 +418,13 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_NCHAR: {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
bb.position(fieldSize * this.rowIndex);
bb.position((fieldSize + BINARY_LENGTH_OFFSET) * this.rowIndex);
int length = bb.getShort();
byte[] dest = new byte[length];
bb.get(dest, 0, length);
if (NullType.isNcharNull(dest, length)) {
return null;
}
try {
String charset = TaosGlobalConfig.getCharset();
return new String(dest, charset);
......
......@@ -18,7 +18,7 @@ public class RestfulConnection extends AbstractConnection {
private final String url;
private final String database;
private final String token;
/******************************************************/
private boolean isClosed;
private final DatabaseMetaData metadata;
......
......@@ -88,17 +88,24 @@ public class RestfulStatement extends AbstractStatement {
}
private String getUrl() throws SQLException {
String dbname = conn.getClientInfo(TSDBDriver.PROPERTY_KEY_DBNAME);
if (dbname == null || dbname.trim().isEmpty()) {
dbname = "";
} else {
dbname = "/" + dbname.toLowerCase();
}
TimestampFormat timestampFormat = TimestampFormat.valueOf(conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).trim().toUpperCase());
String url;
switch (timestampFormat) {
case TIMESTAMP:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt" + dbname;
break;
case UTC:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc" + dbname;
break;
default:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql" + dbname;
}
return url;
}
......
此差异已折叠。
# encoding:UTF-8
from taos import *
conn = connect()
dbname = "pytest_taos_stmt_multi"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册