提交 e04a8ee2 编写于 作者: L lichuang

[TD-1568]merge from develop

......@@ -10,3 +10,6 @@
[submodule "tests/examples/rust"]
path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
[submodule "deps/jemalloc"]
path = deps/jemalloc
url = https://github.com/jemalloc/jemalloc
......@@ -59,6 +59,11 @@ IF (TD_LINUX_64)
MESSAGE(STATUS "linux64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DUSE_LIBICONV)
IF (JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
ENDIF ()
ENDIF ()
IF (TD_LINUX_32)
......
......@@ -72,3 +72,8 @@ IF (${RANDOM_NETWORK_FAIL} MATCHES "true")
SET(TD_RANDOM_NETWORK_FAIL TRUE)
MESSAGE(STATUS "build with random-network-fail enabled")
ENDIF ()
IF (${JEMALLOC_ENABLED} MATCHES "true")
SET(TD_JEMALLOC_ENABLED TRUE)
MESSAGE(STATUS "build with jemalloc enabled")
ENDIF ()
......@@ -18,3 +18,16 @@ ENDIF ()
IF (TD_DARWIN AND TD_MQTT)
ADD_SUBDIRECTORY(MQTT-C)
ENDIF ()
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
MESSAGE("setup dpes/jemalloc, current source dir:" ${CMAKE_CURRENT_SOURCE_DIR})
MESSAGE("binary dir:" ${CMAKE_BINARY_DIR})
include(ExternalProject)
ExternalProject_Add(jemalloc
PREFIX "jemalloc"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
BUILD_COMMAND ${MAKE}
)
ENDIF ()
Subproject commit ea6b3e973b477b8061e0076bb257dbd7f3faa756
......@@ -107,9 +107,9 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
![replica-forward.png](page://images/architecture/replica-forward.png)
1. 应用对写请求做基本的合法性检查,通过,则给请求包打上一个版本号(version, 单调递增)
1. 应用对写请求做基本的合法性检查,通过,则给请求包打上一个版本号(version, 单调递增)
2. 应用将打上版本号的写请求封装一个WAL Head, 写入WAL(Write Ahead Log)
3. 应用调用API syncForwardToPeer,如vnode B是slave状态,sync模块将包含WAL Head的数据包通过Forward消息发送给vnode B,否则就不转发。
3. 应用调用API syncForwardToPeer,如vnode B是slave状态,sync模块将包含WAL Head的数据包通过Forward消息发送给vnode B,否则就不转发。
4. vnode B收到Forward消息后,调用回调函数writeToCache, 交给应用处理
5. vnode B应用在写入成功后,都需要调用syncAckForward通知sync模块已经写入成功。
6. 如果quorum大于1,vnode B需要等待应用的回复确认,收到确认后,vnode B发送Forward Response消息给node A。
......@@ -219,7 +219,7 @@ Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系
不同之处:
- 选举流程不一样:Raft里任何一个节点是candidate时,主动向其他节点发出vote request, 如果超过半数回答Yes, 这个candidate就成为Leader,开始一个新的term. 而TDengine的实现里,节点上线、离线或角色改变都会触发状态消息在节点组类传播,等节点组里状态稳定一致之后才触发选举流程,因为状态稳定一致,基于同样的状态信息,每个节点做出的决定会是一致的,一旦某个节点符合成为master的条件,无需其他节点认可,它会自动将自己设为master。TDengine里,任何一个节点检测到其他节点或自己的角色发生改变,就会给节点组内其他节点进行广播的。Raft里不存在这样的机制,因此需要投票来解决。
- 选举流程不一样:Raft里任何一个节点是candidate时,主动向其他节点发出vote request,如果超过半数回答Yes,这个candidate就成为Leader,开始一个新的term。而TDengine的实现里,节点上线、离线或角色改变都会触发状态消息在节点组内传播,等节点组里状态稳定一致之后才触发选举流程,因为状态稳定一致,基于同样的状态信息,每个节点做出的决定会是一致的,一旦某个节点符合成为master的条件,无需其他节点认可,它会自动将自己设为master。TDengine里,任何一个节点检测到其他节点或自己的角色发生改变,就会向节点组内其他节点进行广播。Raft里不存在这样的机制,因此需要投票来解决。
- 对WAL的一条记录,Raft用term + index来做唯一标识。但TDengine只用version(类似index),在TDengine实现里,仅仅用version是完全可行的, 因为TDengine的选举机制,没有term的概念。
如果整个虚拟节点组全部宕机,重启,但不是所有虚拟节点都上线,这个时候TDengine是不会选出master的,因为未上线的节点有可能有最高version的数据。而RAFT协议,只要超过半数上线,就会选出Leader。
......
......@@ -343,7 +343,7 @@ TDengine采用数据驱动的方式让缓存中的数据写入硬盘进行持久
对于采集的数据,一般有保留时长,这个时长由系统配置参数keep决定。超过这个设置天数的数据文件,将被系统自动删除,释放存储空间。
给定days与keep两个参数,一个vnode总的数据文件数为:keep/days。总的数据文件个数不宜过大,也不宜过小。10到100以内合适。基于这个原则,可以设置合理的days。 目前的版本,参数keep可以修改,但对于参数days,一但设置后,不可修改。
给定days与keep两个参数,一个典型工作状态的vnode中总的数据文件数为:`向上取整(keep/days)+1`。总的数据文件个数不宜过大,也不宜过小。10到100以内合适。基于这个原则,可以设置合理的days。 目前的版本,参数keep可以修改,但对于参数days,一但设置后,不可修改。
在每个数据文件里,一张表的数据是一块一块存储的。一张表可以有一到多个数据文件块。在一个文件块里,数据是列式存储的,占用的是一片连续的存储空间,这样大大提高读取速度。文件块的大小由系统参数maxRows(每块最大记录条数)决定,缺省值为4096。这个值不宜过大,也不宜过小。过大,定位具体时间段的数据的搜索时间会变长,影响读取速度;过小,数据块的索引太大,压缩效率偏低,也影响读取速度。
......
......@@ -516,7 +516,7 @@ conn.close()
- _TDengineCursor_ 类
参考python中help(taos.TDengineCursor)。
这个类对应客户端进行的写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能线程共享使用,否则会导致返回结果出现错误。
这个类对应客户端进行的写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能线程共享使用,否则会导致返回结果出现错误。
- _connect_ 方法
......
......@@ -26,7 +26,7 @@
| TSDB_CODE_COM_OUT_OF_MEMORY | 0 | 0x0102 | "Out of memory" | -2147483390 |
| TSDB_CODE_COM_INVALID_CFG_MSG | 0 | 0x0103 | "Invalid config message" | -2147483389 |
| TSDB_CODE_COM_FILE_CORRUPTED | 0 | 0x0104 | "Data file corrupted" | -2147483388 |
| TSDB_CODE_TSC_INVALID_SQL | 0 | 0x0200 | "Invalid SQL statement" | -2147483136 |
| TSDB_CODE_TSC_INVALID_OPERATION | 0 | 0x0200 | "Invalid SQL statement" | -2147483136 |
| TSDB_CODE_TSC_INVALID_QHANDLE | 0 | 0x0201 | "Invalid qhandle" | -2147483135 |
| TSDB_CODE_TSC_INVALID_TIME_STAMP | 0 | 0x0202 | "Invalid combination of client/service time" | -2147483134 |
| TSDB_CODE_TSC_INVALID_VALUE | 0 | 0x0203 | "Invalid value in client" | -2147483133 |
......
......@@ -37,7 +37,7 @@ taos> DESCRIBE meters;
- Epoch Time:时间戳也可以是一个长整数,表示从 1970-01-01 08:00:00.000 开始的毫秒数
- 时间可以加减,比如 now-2h,表明查询时刻向前推 2 个小时(最近 2 小时)。数字后面的时间单位可以是 u(微秒)、a(毫秒)、s(秒)、m(分)、h(小时)、d(天)、w(周)。 比如 `select * from t1 where ts > now-2w and ts <= now-1w`,表示查询两周前整整一周的数据。在指定降频操作(down sampling)的时间窗口(interval)时,时间单位还可以使用 n(自然月) 和 y(自然年)。
TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableMicrosecond 就可以支持微秒。
TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传递的 PRECISION 参数就可以支持微秒。
在TDengine中,普通表的数据模型中可使用以下 10 种数据类型。
......@@ -76,7 +76,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
4) 一条SQL 语句的最大长度为65480个字符;
5) 数据库还有更多与存储相关的配置参数,请参见 [服务端配置](https://www.taosdata.com/cn/documentation/taos-sql#management) 章节。
5) 数据库还有更多与存储相关的配置参数,请参见 [服务端配置](https://www.taosdata.com/cn/documentation/administrator#config) 章节。
- **显示系统当前参数**
......@@ -400,6 +400,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...;
```
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
说明:`(tb1_field1_name, ...)`的部分可以省略掉,这样就是使用全列模式写入——也即在 VALUES 部分提供的数据,必须为数据表的每个列都显式地提供数据。全列写入速度会远快于指定列,因此建议尽可能采用全列写入方式,此时空列可以填入NULL。
从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写:
```mysql
INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...;
......@@ -423,9 +424,9 @@ Query OK, 1 row(s) in set (0.001029s)
taos> SHOW TABLES;
Query OK, 0 row(s) in set (0.000946s)
taos> INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2);
taos> INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2) VALUES('a');
DB error: invalid SQL: keyword VALUES or FILE required
DB error: invalid SQL: 'a' (invalid timestamp) (0.039494s)
taos> SHOW TABLES;
table_name | created_time | columns | stable_name |
......
......@@ -24,14 +24,14 @@ echo "compile_dir: ${compile_dir}"
echo "pkg_dir: ${pkg_dir}"
if [ -d ${pkg_dir} ]; then
rm -rf ${pkg_dir}
rm -rf ${pkg_dir}
fi
mkdir -p ${pkg_dir}
cd ${pkg_dir}
libfile="libtaos.so.${tdengine_ver}"
# create install dir
# create install dir
install_home_path="/usr/local/taos"
mkdir -p ${pkg_dir}${install_home_path}
mkdir -p ${pkg_dir}${install_home_path}/bin
......@@ -42,7 +42,7 @@ mkdir -p ${pkg_dir}${install_home_path}/examples
mkdir -p ${pkg_dir}${install_home_path}/include
mkdir -p ${pkg_dir}${install_home_path}/init.d
mkdir -p ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/../packaging/cfg/taos.cfg ${pkg_dir}${install_home_path}/cfg
cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_path}/init.d
cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script
......@@ -54,7 +54,7 @@ cp ${compile_dir}/build/bin/taosdemo ${pkg_dir}${install_home_pat
cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
cp ${compile_dir}/../src/inc/taos.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../src/inc/taoserror.h ${pkg_dir}${install_home_path}/include
cp -r ${top_dir}/tests/examples/* ${pkg_dir}${install_home_path}/examples
......@@ -67,7 +67,41 @@ fi
cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector
cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector
cp -r ${top_dir}/src/connector/nodejs ${pkg_dir}${install_home_path}/connector
cp ${compile_dir}/build/lib/taos-jdbcdriver*dist.* ${pkg_dir}${install_home_path}/connector ||:
cp ${compile_dir}/build/lib/taos-jdbcdriver*.* ${pkg_dir}${install_home_path}/connector ||:
if [ -f ${compile_dir}/build/bin/jemalloc-config ]; then
install_user_local_path="/usr/local"
mkdir -p ${pkg_dir}${install_user_local_path}/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3}
cp ${compile_dir}/build/bin/jemalloc-config ${pkg_dir}${install_user_local_path}/bin/
if [ -f ${compile_dir}/build/bin/jemalloc.sh ]; then
cp ${compile_dir}/build/bin/jemalloc.sh ${pkg_dir}${install_user_local_path}/bin/
fi
if [ -f ${compile_dir}/build/bin/jeprof ]; then
cp ${compile_dir}/build/bin/jeprof ${pkg_dir}${install_user_local_path}/bin/
fi
if [ -f ${compile_dir}/build/include/jemalloc/jemalloc.h ]; then
cp ${compile_dir}/build/include/jemalloc/jemalloc.h ${pkg_dir}${install_user_local_path}/include/jemalloc/
fi
if [ -f ${compile_dir}/build/lib/libjemalloc.so.2 ]; then
cp ${compile_dir}/build/lib/libjemalloc.so.2 ${pkg_dir}${install_user_local_path}/lib/
ln -sf libjemalloc.so.2 ${pkg_dir}${install_user_local_path}/lib/libjemalloc.so
fi
if [ -f ${compile_dir}/build/lib/libjemalloc.a ]; then
cp ${compile_dir}/build/lib/libjemalloc.a ${pkg_dir}${install_user_local_path}/lib/
fi
if [ -f ${compile_dir}/build/lib/libjemalloc_pic.a ]; then
cp ${compile_dir}/build/lib/libjemalloc_pic.a ${pkg_dir}${install_user_local_path}/lib/
fi
if [ -f ${compile_dir}/build/lib/pkgconfig/jemalloc.pc ]; then
cp ${compile_dir}/build/lib/pkgconfig/jemalloc.pc ${pkg_dir}${install_user_local_path}/lib/pkgconfig/
fi
if [ -f ${compile_dir}/build/share/doc/jemalloc/jemalloc.html ]; then
cp ${compile_dir}/build/share/doc/jemalloc/jemalloc.html ${pkg_dir}${install_user_local_path}/share/doc/jemalloc/
fi
if [ -f ${compile_dir}/build/share/man/man3/jemalloc.3 ]; then
cp ${compile_dir}/build/share/man/man3/jemalloc.3 ${pkg_dir}${install_user_local_path}/share/man/man3/
fi
fi
cp -r ${compile_dir}/../packaging/deb/DEBIAN ${pkg_dir}/
chmod 755 ${pkg_dir}/DEBIAN/*
......@@ -75,7 +109,7 @@ chmod 755 ${pkg_dir}/DEBIAN/*
# modify version of control
debver="Version: "$tdengine_ver
sed -i "2c$debver" ${pkg_dir}/DEBIAN/control
#get taos version, then set deb name
......@@ -90,7 +124,7 @@ fi
if [ "$verType" == "beta" ]; then
debname=${debname}-${verType}".deb"
elif [ "$verType" == "stable" ]; then
elif [ "$verType" == "stable" ]; then
debname=${debname}".deb"
else
echo "unknow verType, nor stabel or beta"
......@@ -101,7 +135,7 @@ fi
dpkg -b ${pkg_dir} $debname
echo "make deb package success!"
cp ${pkg_dir}/*.deb ${output_dir}
cp ${pkg_dir}/*.deb ${output_dir}
# clean tmep dir
rm -rf ${pkg_dir}
......
......@@ -6,7 +6,7 @@ set -e
#set -x
# release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
# -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
# -V [stable | beta]
# -l [full | lite]
......@@ -22,11 +22,12 @@ cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...]
osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
pagMode=full # [full | lite]
soMode=dynamic # [static | dynamic]
allocator=glibc # [glibc | jemalloc]
dbName=taos # [taos | power]
verNumber=""
verNumberComp="2.0.0.0"
while getopts "hv:V:c:o:l:s:d:n:m:" arg
while getopts "hv:V:c:o:l:s:d:a:n:m:" arg
do
case $arg in
v)
......@@ -53,6 +54,10 @@ do
#echo "dbName=$OPTARG"
dbName=$(echo $OPTARG)
;;
a)
#echo "allocator=$OPTARG"
allocator=$(echo $OPTARG)
;;
n)
#echo "verNumber=$OPTARG"
verNumber=$(echo $OPTARG)
......@@ -71,20 +76,21 @@ do
echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] "
echo " -V [stable | beta] "
echo " -l [full | lite] "
echo " -a [glibc | jemalloc] "
echo " -s [static | dynamic] "
echo " -d [taos | power] "
echo " -n [version number] "
echo " -m [compatible version number] "
exit 0
;;
?) #unknow option
?) #unknow option
echo "unkonw argument"
exit 1
;;
esac
done
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} verNumber=${verNumber} verNumberComp=${verNumberComp}"
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} allocator=${allocator} verNumber=${verNumber} verNumberComp=${verNumberComp}"
curr_dir=$(pwd)
......@@ -118,7 +124,7 @@ function vercomp () {
echo 0
exit 0
fi
local IFS=.
local i ver1=($1) ver2=($2)
......@@ -164,7 +170,7 @@ if [[ "$verMode" == "cluster" ]]; then
else
gitinfoOfInternal=NULL
fi
cd ${curr_dir}
# 2. cmake executable file
......@@ -180,12 +186,18 @@ else
fi
cd ${compile_dir}
if [[ "$allocator" == "jemalloc" ]]; then
allocator_macro="-DJEMALLOC_ENABLED=true"
else
allocator_macro=""
fi
# check support cpu type
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]] ; then
if [ "$verMode" != "cluster" ]; then
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode}
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} ${allocator_macro}
else
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp}
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} ${allocator_macro}
fi
else
echo "input cpuType=${cpuType} error!!!"
......@@ -199,9 +211,9 @@ cd ${curr_dir}
# 3. Call the corresponding script for packaging
if [ "$osType" != "Darwin" ]; then
if [[ "$verMode" != "cluster" ]] && [[ "$cpuType" == "x64" ]] && [[ "$dbName" == "taos" ]]; then
ret='0'
ret='0'
command -v dpkg >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
if [ "$ret" -eq 0 ]; then
echo "====do deb package for the ubuntu system===="
output_dir="${top_dir}/debs"
if [ -d ${output_dir} ]; then
......@@ -214,9 +226,9 @@ if [ "$osType" != "Darwin" ]; then
echo "==========dpkg command not exist, so not release deb package!!!"
fi
ret='0'
ret='0'
command -v rpmbuild >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
if [ "$ret" -eq 0 ]; then
echo "====do rpm package for the centos system===="
output_dir="${top_dir}/rpms"
if [ -d ${output_dir} ]; then
......@@ -229,11 +241,11 @@ if [ "$osType" != "Darwin" ]; then
echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi
fi
echo "====do tar.gz package for all systems===="
cd ${script_dir}/tools
if [[ "$dbName" == "taos" ]]; then
if [[ "$dbName" == "taos" ]]; then
${csudo} ./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
${csudo} ./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
${csudo} ./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
......
#!/bin/bash
#
# Generate rpm package for centos
# Generate rpm package for centos
set -e
# set -x
......@@ -60,7 +60,7 @@ ${csudo} rpmbuild --define="_version ${tdengine_ver}" --define="_topdir ${pkg_di
# copy rpm package to output_dir, and modify package name, then clean temp dir
#${csudo} cp -rf RPMS/* ${output_dir}
cp_rpm_package ${pkg_dir}/RPMS
cp_rpm_package ${pkg_dir}/RPMS
if [ "$verMode" == "cluster" ]; then
......@@ -74,7 +74,7 @@ fi
if [ "$verType" == "beta" ]; then
rpmname=${rpmname}-${verType}".rpm"
elif [ "$verType" == "stable" ]; then
elif [ "$verType" == "stable" ]; then
rpmname=${rpmname}".rpm"
else
echo "unknow verType, nor stabel or beta"
......
%define homepath /usr/local/taos
%define userlocalpath /usr/local
%define cfg_install_dir /etc/taos
%define __strip /bin/true
......@@ -12,22 +13,22 @@ URL: www.taosdata.com
AutoReqProv: no
#BuildRoot: %_topdir/BUILDROOT
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
#Prefix: /usr/local/taos
#BuildRequires:
#Requires:
#BuildRequires:
#Requires:
%description
Big Data Platform Designed and Optimized for IoT
#"prep" Nothing needs to be done
#"prep" Nothing needs to be done
#%prep
#%setup -q
#%setup -T
#%setup -T
#"build" Nothing needs to be done
#"build" Nothing needs to be done
#%build
#%configure
#make %{?_smp_mflags}
......@@ -75,9 +76,53 @@ fi
cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector
cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector
cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector
cp %{_compiledir}/build/lib/taos-jdbcdriver*dist.* %{buildroot}%{homepath}/connector ||:
cp %{_compiledir}/build/lib/taos-jdbcdriver*.* %{buildroot}%{homepath}/connector ||:
cp -r %{_compiledir}/../tests/examples/* %{buildroot}%{homepath}/examples
if [ -f %{_compiledir}/build/bin/jemalloc-config ]; then
mkdir -p %{buildroot}%{userlocalpath}/bin
mkdir -p %{buildroot}%{userlocalpath}/lib
mkdir -p %{buildroot}%{userlocalpath}/lib/pkgconfig
mkdir -p %{buildroot}%{userlocalpath}/include
mkdir -p %{buildroot}%{userlocalpath}/include/jemalloc
mkdir -p %{buildroot}%{userlocalpath}/share
mkdir -p %{buildroot}%{userlocalpath}/share/doc
mkdir -p %{buildroot}%{userlocalpath}/share/doc/jemalloc
mkdir -p %{buildroot}%{userlocalpath}/share/man
mkdir -p %{buildroot}%{userlocalpath}/share/man/man3
cp %{_compiledir}/build/bin/jemalloc-config %{buildroot}%{userlocalpath}/bin/
if [ -f %{_compiledir}/build/bin/jemalloc.sh ]; then
cp %{_compiledir}/build/bin/jemalloc.sh %{buildroot}%{userlocalpath}/bin/
fi
if [ -f %{_compiledir}/build/bin/jeprof ]; then
cp %{_compiledir}/build/bin/jeprof %{buildroot}%{userlocalpath}/bin/
fi
if [ -f %{_compiledir}/build/include/jemalloc/jemalloc.h ]; then
cp %{_compiledir}/build/include/jemalloc/jemalloc.h %{buildroot}%{userlocalpath}/include/jemalloc/
fi
if [ -f %{_compiledir}/build/lib/libjemalloc.so.2 ]; then
cp %{_compiledir}/build/lib/libjemalloc.so.2 %{buildroot}%{userlocalpath}/lib/
ln -sf libjemalloc.so.2 %{buildroot}%{userlocalpath}/lib/libjemalloc.so
fi
if [ -f %{_compiledir}/build/lib/libjemalloc.a ]; then
cp %{_compiledir}/build/lib/libjemalloc.a %{buildroot}%{userlocalpath}/lib/
fi
if [ -f %{_compiledir}/build/lib/libjemalloc_pic.a ]; then
cp %{_compiledir}/build/lib/libjemalloc_pic.a %{buildroot}%{userlocalpath}/lib/
fi
if [ -f %{_compiledir}/build/lib/pkgconfig/jemalloc.pc ]; then
cp %{_compiledir}/build/lib/pkgconfig/jemalloc.pc %{buildroot}%{userlocalpath}/lib/pkgconfig/
fi
if [ -f %{_compiledir}/build/share/doc/jemalloc/jemalloc.html ]; then
cp %{_compiledir}/build/share/doc/jemalloc/jemalloc.html %{buildroot}%{userlocalpath}/share/doc/jemalloc/
fi
if [ -f %{_compiledir}/build/share/man/man3/jemalloc.3 ]; then
cp %{_compiledir}/build/share/man/man3/jemalloc.3 %{buildroot}%{userlocalpath}/share/man/man3/
fi
fi
#Scripts executed before installation
%pre
csudo=""
......@@ -103,7 +148,7 @@ fi
# if taos.cfg already softlink, remove it
if [ -f %{cfg_install_dir}/taos.cfg ]; then
${csudo} rm -f %{homepath}/cfg/taos.cfg || :
fi
fi
# there can not libtaos.so*, otherwise ln -s error
${csudo} rm -f %{homepath}/driver/libtaos* || :
......@@ -116,18 +161,18 @@ if command -v sudo > /dev/null; then
fi
cd %{homepath}/script
${csudo} ./post.sh
# Scripts executed before uninstall
%preun
csudo=""
if command -v sudo > /dev/null; then
csudo="sudo"
fi
# only remove package to call preun.sh, not but update(2)
# only remove package to call preun.sh, not but update(2)
if [ $1 -eq 0 ];then
#cd %{homepath}/script
#${csudo} ./preun.sh
if [ -f %{homepath}/script/preun.sh ]; then
cd %{homepath}/script
${csudo} ./preun.sh
......@@ -135,7 +180,7 @@ if [ $1 -eq 0 ];then
bin_link_dir="/usr/bin"
lib_link_dir="/usr/lib"
inc_link_dir="/usr/include"
data_link_dir="/usr/local/taos/data"
log_link_dir="/usr/local/taos/log"
cfg_link_dir="/usr/local/taos/cfg"
......@@ -149,20 +194,20 @@ if [ $1 -eq 0 ];then
${csudo} rm -f ${inc_link_dir}/taos.h || :
${csudo} rm -f ${inc_link_dir}/taoserror.h || :
${csudo} rm -f ${lib_link_dir}/libtaos.* || :
${csudo} rm -f ${log_link_dir} || :
${csudo} rm -f ${data_link_dir} || :
pid=$(ps -ef | grep "taosd" | grep -v "grep" | awk '{print $2}')
if [ -n "$pid" ]; then
${csudo} kill -9 $pid || :
fi
fi
fi
fi
fi
# Scripts executed after uninstall
%postun
# clean build dir
%clean
csudo=""
......
此差异已折叠。
#!/bin/bash
#
# This file is used to install TAOS time-series database on linux systems. The operating system
# This file is used to install TAOS time-series database on linux systems. The operating system
# is required to use systemd to manage services at boot
set -e
# set -x
# -----------------------Variables definition---------------------
# -----------------------Variables definition
source_dir=$1
binary_dir=$2
osType=$3
......@@ -71,9 +71,9 @@ if [ "$osType" != "Darwin" ]; then
service_mod=0
elif $(which service &> /dev/null); then
service_mod=1
service_config_dir="/etc/init.d"
service_config_dir="/etc/init.d"
if $(which chkconfig &> /dev/null); then
initd_mod=1
initd_mod=1
elif $(which insserv &> /dev/null); then
initd_mod=2
elif $(which update-rc.d &> /dev/null); then
......@@ -123,9 +123,9 @@ function kill_taosd() {
function install_main_path() {
#create install main dir and all sub dir
${csudo} rm -rf ${install_main_dir} || :
${csudo} mkdir -p ${install_main_dir}
${csudo} mkdir -p ${install_main_dir}
${csudo} mkdir -p ${install_main_dir}/cfg
${csudo} mkdir -p ${install_main_dir}/bin
${csudo} mkdir -p ${install_main_dir}/bin
${csudo} mkdir -p ${install_main_dir}/connector
${csudo} mkdir -p ${install_main_dir}/driver
${csudo} mkdir -p ${install_main_dir}/examples
......@@ -176,6 +176,49 @@ function install_bin() {
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || :
fi
}
function install_jemalloc() {
if [ "$osType" != "Darwin" ]; then
/usr/bin/install -c -d /usr/local/bin
if [ -f ${binary_dir}/build/bin/jemalloc-config ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jemalloc-config /usr/local/bin
fi
if [ -f ${binary_dir}/build/bin/jemalloc.sh ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jemalloc.sh /usr/local/bin
fi
if [ -f ${binary_dir}/build/bin/jeprof ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jeprof /usr/local/bin
fi
if [ -f ${binary_dir}/build/include/jemalloc/jemalloc.h ]; then
/usr/bin/install -c -d /usr/local/include/jemalloc
/usr/bin/install -c -m 644 ${binary_dir}/build/include/jemalloc/jemalloc.h /usr/local/include/jemalloc
fi
if [ -f ${binary_dir}/build/lib/libjemalloc.so.2 ]; then
/usr/bin/install -c -d /usr/local/lib
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.so.2 /usr/local/lib
ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
/usr/bin/install -c -d /usr/local/lib
if [ -f ${binary_dir}/build/lib/libjemalloc.a ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.a /usr/local/lib
fi
if [ -f ${binary_dir}/build/lib/libjemalloc_pic.a ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc_pic.a /usr/local/lib
fi
if [ -f ${binary_dir}/build/lib/pkgconfig/jemalloc.pc ]; then
/usr/bin/install -c -d /usr/local/lib/pkgconfig
/usr/bin/install -c -m 644 ${binary_dir}/build/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig
fi
fi
if [ -f ${binary_dir}/build/share/doc/jemalloc/jemalloc.html ]; then
/usr/bin/install -c -d /usr/local/share/doc/jemalloc
/usr/bin/install -c -m 644 ${binary_dir}/build/share/doc/jemalloc/jemalloc.html /usr/local/share/doc/jemalloc
fi
if [ -f ${binary_dir}/build/share/man/man3/jemalloc.3 ]; then
/usr/bin/install -c -d /usr/local/share/man/man3
/usr/bin/install -c -m 644 ${binary_dir}/build/share/man/man3/jemalloc.3 /usr/local/share/man/man3
fi
fi
}
function install_lib() {
# Remove links
......@@ -183,12 +226,12 @@ function install_lib() {
if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${lib64_link_dir}/libtaos.* || :
fi
if [ "$osType" != "Darwin" ]; then
${csudo} cp ${binary_dir}/build/lib/libtaos.so.${verNumber} ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo} ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
if [ -d "${lib64_link_dir}" ]; then
${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1
${csudo} ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
......@@ -198,7 +241,9 @@ function install_lib() {
${csudo} ln -sf ${install_main_dir}/driver/libtaos.1.dylib ${lib_link_dir}/libtaos.1.dylib
${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
fi
install_jemalloc
if [ "$osType" != "Darwin" ]; then
${csudo} ldconfig
fi
......@@ -206,26 +251,26 @@ function install_lib() {
function install_header() {
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/*
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/*
${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
}
function install_config() {
#${csudo} rm -f ${install_main_dir}/cfg/taos.cfg || :
if [ ! -f ${cfg_install_dir}/taos.cfg ]; then
if [ ! -f ${cfg_install_dir}/taos.cfg ]; then
${csudo} mkdir -p ${cfg_install_dir}
[ -f ${script_dir}/../cfg/taos.cfg ] && ${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir}
${csudo} chmod 644 ${cfg_install_dir}/*
fi
fi
${csudo} cp -f ${script_dir}/../cfg/taos.cfg ${install_main_dir}/cfg/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
}
function install_log() {
function install_log() {
${csudo} rm -rf ${log_dir} || :
if [ "$osType" != "Darwin" ]; then
......@@ -239,7 +284,7 @@ function install_log() {
function install_data() {
${csudo} mkdir -p ${data_dir}
${csudo} ln -s ${data_dir} ${install_main_dir}/data
${csudo} ln -s ${data_dir} ${install_main_dir}/data
}
function install_connector() {
......@@ -254,8 +299,8 @@ function install_connector() {
echo "WARNING: go connector not found, please check if want to use it!"
fi
${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
}
function install_examples() {
......@@ -264,8 +309,8 @@ function install_examples() {
function clean_service_on_sysvinit() {
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
#${csudo} sed -i "\|${restart_config_str}|d" /etc/inittab || :
#${csudo} sed -i "\|${restart_config_str}|d" /etc/inittab || :
if pidof taosd &> /dev/null; then
${csudo} service taosd stop || :
fi
......@@ -277,9 +322,9 @@ function clean_service_on_sysvinit() {
elif ((${initd_mod}==3)); then
${csudo} update-rc.d -f taosd remove || :
fi
${csudo} rm -f ${service_config_dir}/taosd || :
if $(which init &> /dev/null); then
${csudo} init q || :
fi
......@@ -298,10 +343,10 @@ function install_service_on_sysvinit() {
${csudo} cp -f ${script_dir}/../rpm/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../rpm/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
fi
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
#${csudo} grep -q -F "$restart_config_str" /etc/inittab || ${csudo} bash -c "echo '${restart_config_str}' >> /etc/inittab"
if ((${initd_mod}==1)); then
${csudo} chkconfig --add taosd || :
${csudo} chkconfig --level 2345 taosd on || :
......@@ -323,7 +368,7 @@ function clean_service_on_systemd() {
${csudo} systemctl disable taosd &> /dev/null || echo &> /dev/null
${csudo} rm -f ${taosd_service_config}
}
}
# taos:2345:respawn:/etc/init.d/taosd start
......@@ -383,7 +428,7 @@ function update_TDengine() {
sleep 1
fi
fi
install_main_path
install_log
......@@ -431,16 +476,16 @@ function install_TDengine() {
# Start to install
if [ "$osType" != "Darwin" ]; then
echo -e "${GREEN}Start to install TDEngine...${NC}"
else
echo -e "${GREEN}Start to install TDEngine Client ...${NC}"
else
echo -e "${GREEN}Start to install TDEngine Client ...${NC}"
fi
install_main_path
install_main_path
if [ "$osType" != "Darwin" ]; then
if [ "$osType" != "Darwin" ]; then
install_data
fi
install_log
install_log
install_header
install_lib
install_connector
......@@ -452,7 +497,7 @@ function install_TDengine() {
install_service
fi
install_config
install_config
if [ "$osType" != "Darwin" ]; then
# Ask if to start the service
......
......@@ -30,12 +30,12 @@ else
install_dir="${release_dir}/TDengine-server-${version}"
fi
# Directories and files.
# Directories and files
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taosd
strip ${build_dir}/bin/taosd
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh"
else
else
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator\
${script_dir}/remove.sh ${script_dir}/set_core.sh ${script_dir}/startPre.sh ${script_dir}/taosd-dump-cfg.gdb"
fi
......@@ -73,10 +73,43 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/taos
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || :
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || :
if [ -f ${build_dir}/bin/jemalloc-config ]; then
mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3}
cp ${build_dir}/bin/jemalloc-config ${install_dir}/jemalloc/bin
if [ -f ${build_dir}/bin/jemalloc.sh ]; then
cp ${build_dir}/bin/jemalloc.sh ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/bin/jeprof ]; then
cp ${build_dir}/bin/jeprof ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/include/jemalloc/jemalloc.h ]; then
cp ${build_dir}/include/jemalloc/jemalloc.h ${install_dir}/jemalloc/include/jemalloc
fi
if [ -f ${build_dir}/lib/libjemalloc.so.2 ]; then
cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib
ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so
fi
if [ -f ${build_dir}/lib/libjemalloc.a ]; then
cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then
cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then
cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig
fi
if [ -f ${build_dir}/share/doc/jemalloc/jemalloc.html ]; then
cp ${build_dir}/share/doc/jemalloc/jemalloc.html ${install_dir}/jemalloc/share/doc/jemalloc
fi
if [ -f ${build_dir}/share/man/man3/jemalloc.3 ]; then
cp ${build_dir}/share/man/man3/jemalloc.3 ${install_dir}/jemalloc/share/man/man3
fi
fi
if [ "$verMode" == "cluster" ]; then
sed 's/verMode=edge/verMode=cluster/g' ${install_dir}/bin/remove.sh >> remove_temp.sh
mv remove_temp.sh ${install_dir}/bin/remove.sh
mkdir -p ${install_dir}/nginxd && cp -r ${nginx_dir}/* ${install_dir}/nginxd
cp ${nginx_dir}/png/taos.png ${install_dir}/nginxd/admin/images/taos.png
rm -rf ${install_dir}/nginxd/png
......@@ -132,7 +165,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ -d ${examples_dir}/JDBC/taosdemo/target ]; then
rm -rf ${examples_dir}/JDBC/taosdemo/target
fi
cp -r ${examples_dir}/JDBC ${install_dir}/examples
cp -r ${examples_dir}/matlab ${install_dir}/examples
cp -r ${examples_dir}/python ${install_dir}/examples
......@@ -142,7 +175,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp -r ${examples_dir}/C# ${install_dir}/examples
fi
# Copy driver
mkdir -p ${install_dir}/driver
mkdir -p ${install_dir}/driver
cp ${lib_files} ${install_dir}/driver
# Copy connector
......@@ -168,7 +201,7 @@ fi
# exit 1
cd ${release_dir}
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${osType}-${cpuType}
......@@ -185,8 +218,8 @@ fi
if [ "$verType" == "beta" ]; then
pkg_name=${pkg_name}-${verType}
elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name}
elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name}
else
echo "unknow verType, nor stabel or beta"
exit 1
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCLOCALMERGE_H
#define TDENGINE_TSCLOCALMERGE_H
#ifndef TDENGINE_TSCGLOBALMERGE_H
#define TDENGINE_TSCGLOBALMERGE_H
#ifdef __cplusplus
extern "C" {
......@@ -24,7 +24,7 @@ extern "C" {
#include "qFill.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "tsclient.h"
#include "qExecutor.h"
#define MAX_NUM_OF_SUBQUERY_RETRY 3
......@@ -38,40 +38,32 @@ typedef struct SLocalDataSource {
tFilePage filePage;
} SLocalDataSource;
typedef struct SLocalMerger {
SLocalDataSource ** pLocalDataSrc;
typedef struct SGlobalMerger {
SLocalDataSource **pLocalDataSrc;
int32_t numOfBuffer;
int32_t numOfCompleted;
int32_t numOfVnode;
SLoserTreeInfo * pLoserTree;
tFilePage * pResultBuf;
int32_t nResultBufSize;
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
SColumnModel* finalModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
bool orderPrjOnSTable; // projection query on stable
} SLocalMerger;
SLoserTreeInfo *pLoserTree;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor *pDesc;
tExtMemBuffer **pExtMemBuffer; // disk-based buffer
char *buf; // temp buffer
} SGlobalMerger;
struct SSqlObj;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
SColumnModel* pFinalColModel; // colModel for final result
SColumnModel* pFFColModel;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql;
struct SSqlObj *pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
int32_t tscCreateGlobalMergerEnv(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel,
int32_t numOfVnodes);
void tscDestroyGlobalMergerEnv(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
int32_t numOfRows, int32_t orderType);
......@@ -81,15 +73,13 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
/*
* create local reducer to launch the second-stage reduce process at client site
*/
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql);
void tscDestroyLocalMerger(SSqlObj *pSql);
int32_t tscCreateGlobalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo *pQueryInfo, SGlobalMerger **pMerger, int64_t id);
//int32_t tscDoLocalMerge(SSqlObj *pSql);
void tscDestroyGlobalMerger(SGlobalMerger* pMerger);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCLOCALMERGE_H
#endif // TDENGINE_TSCGLOBALMERGE_H
......@@ -25,7 +25,8 @@ extern "C" {
#include "qExtbuffer.h"
#include "taosdef.h"
#include "tbuffer.h"
#include "tscLocalMerge.h"
#include "tscGlobalmerge.h"
#include "tsched.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
......@@ -36,6 +37,9 @@ extern "C" {
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
......@@ -59,7 +63,7 @@ typedef struct SJoinSupporter {
SArray* exprList;
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupInfo; // group by info
SGroupbyExpr groupInfo; // group by info
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
......@@ -90,22 +94,14 @@ typedef struct SVgroupTableInfo {
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
return pCmd->pQueryInfo[subClauseIndex];
}
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd);
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset);
......@@ -114,7 +110,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap);
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......@@ -127,7 +123,9 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo);
......@@ -136,13 +134,14 @@ bool isStabledev(SQueryInfo* pQueryInfo);
bool isTsCompQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscHasColumnFilter(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo);
......@@ -150,9 +149,9 @@ bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType, int16_t colId);
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql);
int32_t tscSetTableFullName(SName* pName, SStrToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertData(char* sqlstr);
......@@ -171,36 +170,49 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize);
int32_t tscFieldInfoSetSize(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
int32_t tscFieldInfoSetSize(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
int32_t tscGetResRowLength(SArray* pExprList);
SExprInfo* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType);
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
SExprInfo* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscSqlExprInfoDestroy(SArray* pExprInfo);
size_t tscNumOfExprs(SQueryInfo* pQueryInfo);
SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy);
void tscExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
......@@ -222,14 +234,14 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int32_t tscAddQueryInfo(SSqlCmd *pCmd);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
......@@ -243,12 +255,11 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscDoQuery(SSqlObj* pSql);
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
......@@ -275,11 +286,12 @@ void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd);
void registerSqlObj(SSqlObj* pSql);
void tscInitResForMerge(SSqlRes* pRes);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlCmd* pCmd);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
......@@ -295,6 +307,11 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
......@@ -309,15 +326,20 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, void* buf);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
char* strdup_throw(const char* str);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCHEMAUTIL_H
#define TDENGINE_TSCHEMAUTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
#include "tsclient.h"
#include "ttoken.h"
/**
* get the number of tags of this table
* @param pTableMeta
* @return
*/
int32_t tscGetNumOfTags(const STableMeta* pTableMeta);
/**
* get the number of columns of this table
* @param pTableMeta
* @return
*/
int32_t tscGetNumOfColumns(const STableMeta* pTableMeta);
/**
* get the basic info of this table
* @param pTableMeta
* @return
*/
STableComInfo tscGetTableInfo(const STableMeta* pTableMeta);
/**
* get the schema
* @param pTableMeta
* @return
*/
SSchema* tscGetTableSchema(const STableMeta* pTableMeta);
/**
* get the tag schema
* @param pMeta
* @return
*/
SSchema *tscGetTableTagSchema(const STableMeta *pMeta);
/**
* get the column schema according to the column index
* @param pMeta
* @param colIndex
* @return
*/
SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
/**
* get the column schema according to the column id
* @param pTableMeta
* @param colId
* @return
*/
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
/**
* create the table meta from the msg
* @param pTableMetaMsg
* @param size size of the table meta
* @return
*/
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCHEMAUTIL_H
......@@ -40,23 +40,9 @@ extern "C" {
// forward declaration
struct SSqlInfo;
struct SLocalMerger;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct STableComInfo {
uint8_t numOfTags;
uint8_t precision;
int16_t numOfColumns;
int32_t rowSize;
} STableComInfo;
typedef struct SNewVgroupInfo {
int32_t vgId;
int8_t inUse;
......@@ -72,34 +58,6 @@ typedef struct CChildTableMeta {
uint64_t suid; // super table id
} CChildTableMeta;
typedef struct STableMeta {
int32_t vgId;
STableId id;
uint8_t tableType;
char sTableName[TSDB_TABLE_FNAME_LEN]; // super table name
uint64_t suid; // super table id
int16_t sversion;
int16_t tversion;
STableComInfo tableInfo;
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
uint32_t tableMetaSize;
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t vgroupIndex;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
......@@ -117,44 +75,6 @@ typedef struct SInternalField {
SExprInfo *pExpr;
} SInternalField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode* joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
// relation between tbname list and query condition, including : TK_AND or TK_OR
int16_t relType;
// tbname query condition, only support tbname query condition on one table
SCond tbnameCond;
// join condition, only support two tables join currently
SJoinInfo joinInfo;
// for different table, the query condition must be seperated
SArray *pCond;
} STagCond;
typedef struct SParamInfo {
int32_t idx;
uint8_t type;
......@@ -197,92 +117,48 @@ typedef struct STableDataBlocks {
SParamInfo *params;
} STableDataBlocks;
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SSqlGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*>
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
SOrderVal order;
int16_t fillType; // final result fill type
int16_t numOfTables;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
bool distinctTag; // distinct tag or not
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
SQInfo* pQInfo; // global merge operator
SArray* pDSOperator; // data source operator
SArray* pPhyOperator; // physical query execution plan
SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
int32_t havingFieldNum;
} SQueryInfo;
typedef struct {
STableMeta *pTableMeta;
SVgroupsInfo *pVgroupInfo;
} STableMetaVgroupInfo;
typedef struct SInsertStatementParam {
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables; // number of tables in table name list
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t schemaAttached; // denote if submit block is built with table schema or not
STagData tagData; // NOTE: pTagData->data is used as a variant length array
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
char msg[512]; // error message
uint32_t insertType; // insert data from [file|sql statement| bound statement]
uint64_t objectId; // sql object id
char *sql; // current sql statement position
} SInsertStatementParam;
// TODO extract sql parser supporter
typedef struct {
int command;
uint8_t msgType;
SInsertStatementParam insertParam;
char reserve1[3]; // fix bus error on arm32
bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
union {
int32_t count;
int32_t numOfTablesInSubmit;
};
int32_t count; // todo remove it
uint32_t insertType; // TODO remove it
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
char reserve2[3]; // fix bus error on arm32
int16_t numOfCols;
char reserve3[2]; // fix bus error on arm32
uint32_t allocSize;
char * payload;
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
int32_t clauseIndex; // index of multiple subclause query
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
SHashObj *pTableMetaMap; // local buffer to keep the queried table meta, before validating the AST
SQueryInfo *pQueryInfo;
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t resColumnId;
} SSqlCmd;
typedef struct SResRec {
......@@ -317,7 +193,7 @@ typedef struct {
TAOS_FIELD* final;
SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions
struct SLocalMerger *pLocalMerger;
struct SGlobalMerger *pMerger;
} SSqlRes;
typedef struct {
......@@ -443,8 +319,8 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void destroyTableNameList(SSqlCmd* pCmd);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
void destroyTableNameList(SInsertStatementParam* pInsertParam);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......@@ -476,7 +352,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
void tscImportDataFromFile(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
struct SGlobalMerger* tscInitResObjForLocalQuery(int32_t numOfRes, int32_t rowLen, uint64_t id);
bool tscIsUpdateQuery(SSqlObj* pSql);
char* tscGetSqlStr(SSqlObj* pSql);
bool tscIsQueryWithLimit(SSqlObj* pSql);
......@@ -486,10 +362,10 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscInvalidOperationMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
extern int32_t sentinel;
extern SHashObj *tscVgroupMap;
......@@ -505,7 +381,7 @@ extern int tscNumOfObj; // number of existed sqlObj in current process.
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
int16_t getNewResColId(SQueryInfo* pQueryInfo);
int16_t getNewResColId(SSqlCmd* pCmd);
#ifdef __cplusplus
}
......
......@@ -218,11 +218,19 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeBatchImp
* Method: closeStmt
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/**
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
#ifdef __cplusplus
}
#endif
......
......@@ -749,7 +749,6 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
}
jniDebug("jobj:%p, conn:%p, set stmt bind table name:%s", jobj, tsconn, name);
(*env)->ReleaseStringUTFChars(env, jname, name);
return JNI_SUCCESS;
}
......@@ -762,7 +761,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL;
......@@ -777,14 +776,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
}
len = (*env)->GetArrayLength(env, lengthList);
char *lengthArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
char *lengthArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
char *nullArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
char *nullArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
}
......@@ -799,22 +798,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
b->length = (int32_t*)lengthArray;
// set the length and is_null array
switch(dataType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
int32_t bytes = tDataTypes[dataType].bytes;
for(int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY: {
// do nothing
if (!IS_VAR_DATA_TYPE(dataType)) {
int32_t bytes = tDataTypes[dataType].bytes;
for (int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
}
}
......@@ -878,3 +865,74 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(JNIEnv *env, jobject jobj,
jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte*) typeArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
if ((*env)->ExceptionCheck(env)) {
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char* curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for(int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t*) &lengthArray[i];
curTags += lengthArray[i];
}
int32_t code = taos_stmt_set_tbname_tags((void*)stmt, name, tagsBind);
int32_t nTags = (int32_t) numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
}
......@@ -22,7 +22,7 @@
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsched.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "tsclient.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -58,7 +58,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
strntolower(pSql->sqlstr, sqlstr, (int32_t)sqlLen);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pCmd->curSql = pSql->sqlstr;
pCmd->resColumnId = TSDB_RES_COL_ID;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
......@@ -69,7 +69,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo);
}
......@@ -127,7 +127,8 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling;
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
......@@ -219,7 +220,18 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
tscResetForNextRetrieve(pRes);
// handle the sub queries of join query
// handle outer query based on the already retrieved nest query results.
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
return;
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) {
......@@ -231,7 +243,8 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
* all available virtual nodes in current clause has been checked already, now try the
* next one in the following union subclause
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling; // todo refactor
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
......@@ -255,7 +268,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd);
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd);
tscBuildAndSendRequest(pSql, pQueryInfo1);
}
}
......@@ -317,26 +330,38 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM
// update the pExpr info, colList info, number of table columns
// TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case.
if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) {
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema *pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr *pExpr = &(tscSqlExprGet(pQueryInfo, i)->base);
SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base);
// update the table uid
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
if (pExpr->colInfo.colIndex >= 0) {
int32_t index = pExpr->colInfo.colIndex;
if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) ||
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < numOfCols || index >= (numOfCols + numOfTags)))) {
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < 0 || index >= numOfTags))) {
return pSql->retryReason;
}
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
if ((pTagSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pTagSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else if (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag)) {
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else { // do nothing for udc
}
}
}
......@@ -374,12 +399,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg);
if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send the corresponding query", pSql->self);
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("0x%" PRIx64 " update cached table-meta, continue to process sql and send the corresponding query", pSql->self);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -401,42 +426,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // continue to process normal async query
if (pCmd->parseFinished) {
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send corresponding query", pSql->self);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
}
assert(pCmd->command != TSDB_SQL_INSERT);
if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64" redo parse sql string and proceed", pSql->self);
pCmd->parseFinished = false;
tscResetSqlCmd(pCmd, true);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscBuildAndSendRequest(pSql, NULL);
} else { // in all other cases, simple retry
tscBuildAndSendRequest(pSql, NULL);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
tscDebug("0x%"PRIx64" continue parse sql after get table meta", pSql->self);
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -446,8 +437,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
......@@ -457,59 +448,52 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
(*pSql->fp)(pSql->param, pSql, code);
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
} else {
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
tscImportDataFromFile(pSql);
} else {
tscHandleMultivnodeInsert(pSql);
}
}
} else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again",
pSql->self);
tscResetSqlCmd(pCmd, false);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else {
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
executeQuery(pSql, pQueryInfo1);
tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
}
}
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} else { // stream computing
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo1);
}
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
} else { // stream computing
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command);
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscNumOfExprs(pQueryInfo) == 0) {
tsParseSql(pSql, false);
}
(*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
// tscDoQuery(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return;
_error:
......
......@@ -20,7 +20,7 @@
#include "tname.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "tsclient.h"
#include "taos.h"
#include "tscSubquery.h"
......@@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
// one column for each row
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -71,7 +71,9 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
numOfRows = numOfRows + tscGetNumOfTags(pMeta);
}
tscInitResObjForLocalQuery(pSql, totalNumOfRows, rowLen);
pSql->res.pMerger = tscInitResObjForLocalQuery(totalNumOfRows, rowLen, pSql->self);
tscInitResForMerge(&pSql->res);
SSchema *pSchema = tscGetTableSchema(pMeta);
for (int32_t i = 0; i < numOfRows; ++i) {
......@@ -154,14 +156,14 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pSql->cmd.numOfCols = numOfCols;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE};
tstrncpy(f.name, "Field", sizeof(f.name));
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false);
rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE);
......@@ -171,7 +173,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Type", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
-1000, typeColLength, false);
rowLen += typeColLength + VARSTR_HEADER_SIZE;
......@@ -181,7 +183,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Length", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
-1000, sizeof(int32_t), false);
rowLen += sizeof(int32_t);
......@@ -191,7 +193,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Note", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
-1000, noteColLength, false);
rowLen += noteColLength + VARSTR_HEADER_SIZE;
......@@ -199,7 +201,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
}
static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
......@@ -390,7 +392,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
SColumnIndex index = {0};
pSql->cmd.numOfCols = 2;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f;
......@@ -405,7 +407,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
rowLen += f.bytes;
......@@ -418,7 +420,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false);
rowLen += ddlLen + VARSTR_HEADER_SIZE;
......@@ -428,12 +430,13 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const char *tableName, const char *ddl) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t numOfRows = 1;
if (strlen(ddl) == 0) {
}
tscInitResObjForLocalQuery(pSql, numOfRows, rowLen);
pSql->res.pMerger = tscInitResObjForLocalQuery(numOfRows, rowLen, pSql->self);
tscInitResForMerge(&pSql->res);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * numOfRows;
......@@ -445,7 +448,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
return 0;
}
static int32_t tscSCreateBuildResult(SSqlObj *pSql, BuildType type, const char *str, const char *result) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t rowLen = tscSCreateBuildResultFields(pSql, type, result);
tscFieldInfoUpdateOffset(pQueryInfo);
......@@ -532,7 +535,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
}
buf[0] = 0;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
free(buf);
......@@ -553,7 +556,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
return TSDB_CODE_SUCCESS;
}
static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -607,7 +610,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
}
static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -634,7 +637,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, char *ddl) {
char *result = ddl;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -675,7 +678,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pTableMetaInfo->pTableMeta != NULL);
......@@ -704,7 +707,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
}
static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -730,7 +733,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
return TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY;
......@@ -757,7 +760,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
extractDBName(pSql->pTscObj->db, db);
pthread_mutex_unlock(&pSql->pTscObj->mutex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -784,7 +787,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static int32_t tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -807,7 +810,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) {
}
static int32_t tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -859,7 +862,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return pSql->res.code;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
int32_t val = 1;
......@@ -873,7 +876,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pCmd->numOfCols = 1;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
......@@ -882,7 +885,8 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
TAOS_FIELD f = tscCreateField((int8_t)type, columnName, (int16_t)valueLength);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
tscInitResObjForLocalQuery(pSql, 1, (int32_t)valueLength);
pSql->res.pMerger = tscInitResObjForLocalQuery(1, (int32_t)valueLength, pSql->self);
tscInitResForMerge(&pSql->res);
SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, 0);
pInfo->pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
......@@ -928,7 +932,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
} else if (pCmd->command == TSDB_SQL_SERV_STATUS) {
pRes->code = tscProcessServStatus(pSql);
} else {
pRes->code = TSDB_CODE_TSC_INVALID_SQL;
pRes->code = TSDB_CODE_TSC_INVALID_OPERATION;
tscError("0x%"PRIx64" not support command:%d", pSql->self, pCmd->command);
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tschemautil.h>
#include "os.h"
#include "taosmsg.h"
#include "tscLog.h"
......@@ -37,7 +36,7 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
static bool isProjectStream(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
SExprInfo *pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId != TSDB_FUNC_PRJ) {
return false;
}
......@@ -89,12 +88,12 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0);
code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
}
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -138,7 +137,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream->numOfRes = 0; // reset the numOfRes.
SSqlObj *pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
if (pStream->isProject) {
......@@ -197,7 +196,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0);
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
......@@ -224,7 +223,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
......@@ -273,7 +272,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
......@@ -444,7 +443,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
sprintf(pSql->cmd.payload, "the interval value is 0");
......@@ -494,7 +493,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay
......@@ -556,7 +555,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -614,16 +613,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return NULL;
}
pStream->stime = stime;
pStream->fp = fp;
pStream->stime = stime;
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pStream->param = param;
pStream->pSql = pSql;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
......@@ -632,14 +631,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
strtolower(pSql->sqlstr, sqlstr);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
pSql->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pSql->rspSem, 0, 0);
registerSqlObj(pSql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
......
......@@ -151,6 +151,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qId = 0;
pRes->numOfRows = 1;
pCmd->resColumnId = TSDB_RES_COL_ID;
code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
......@@ -173,7 +174,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
if (pSql->cmd.command != TSDB_SQL_SELECT && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
line = __LINE__;
code = TSDB_CODE_TSC_INVALID_SQL;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto fail;
}
......@@ -266,7 +267,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
pSub->lastSyncTime = taosGetTimestampMs();
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSubscriptionProgress target = {.uid = pTableMeta->id.uid, .key = 0};
......@@ -284,7 +285,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
size_t numOfTables = taosArrayGetSize(tables);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
for( size_t i = 0; i < numOfTables; i++ ) {
STidTags* tt = taosArrayGet( tables, i );
......@@ -304,7 +305,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
taosArrayDestroy(tables);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return 1;
}
......@@ -503,8 +504,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlObj *pSql = pSub->pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
size_t size = taosArrayGetSize(pSub->progress);
......
此差异已折叠。
此差异已折叠。
......@@ -44,8 +44,8 @@ typedef struct SResPair {
// the structure for sql function in select clause
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
char token[TSDB_COL_NAME_LEN]; // original token
SColIndex colInfo;
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
......@@ -92,10 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
//SSchema tGetTbnameColumnSchema();
SSchema tGetBlockDistColumnSchema();
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
bool tscValidateTableNameLength(size_t len);
......
......@@ -2569,6 +2569,7 @@ _arithmetic_operator_fn_t getArithmeticOperatorFn(int32_t arithmeticOptr) {
case TSDB_BINARY_OP_REMAINDER:
return vectorRemainder;
default:
assert(0);
return NULL;
}
}
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <texpr.h>
#include "os.h"
#include "texpr.h"
......@@ -465,27 +466,29 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
return expr;
}
tExprNode* exprdup(tExprNode* pTree) {
if (pTree == NULL) {
tExprNode* exprdup(tExprNode* pNode) {
if (pNode == NULL) {
return NULL;
}
tExprNode* pNode = calloc(1, sizeof(tExprNode));
if (pTree->nodeType == TSQL_NODE_EXPR) {
tExprNode* pLeft = exprdup(pTree->_node.pLeft);
tExprNode* pRight = exprdup(pTree->_node.pRight);
pNode->nodeType = TSQL_NODE_EXPR;
pNode->_node.pLeft = pLeft;
pNode->_node.pRight = pRight;
} else if (pTree->nodeType == TSQL_NODE_VALUE) {
pNode->pVal = calloc(1, sizeof(tVariant));
tVariantAssign(pNode->pVal, pTree->pVal);
} else if (pTree->nodeType == TSQL_NODE_COL) {
pNode->pSchema = calloc(1, sizeof(SSchema));
*pNode->pSchema = *pTree->pSchema;
tExprNode* pCloned = calloc(1, sizeof(tExprNode));
if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprNode* pLeft = exprdup(pNode->_node.pLeft);
tExprNode* pRight = exprdup(pNode->_node.pRight);
pCloned->_node.pLeft = pLeft;
pCloned->_node.pRight = pRight;
pCloned->_node.optr = pNode->_node.optr;
pCloned->_node.hasPK = pNode->_node.hasPK;
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
pCloned->pVal = calloc(1, sizeof(tVariant));
tVariantAssign(pCloned->pVal, pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
pCloned->pSchema = calloc(1, sizeof(SSchema));
*pCloned->pSchema = *pNode->pSchema;
}
return pNode;
pCloned->nodeType = pNode->nodeType;
return pCloned;
}
......@@ -46,7 +46,7 @@ char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0;
// common
int32_t tsRpcTimer = 1000;
int32_t tsRpcTimer = 300;
int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsRpcForceTcp = 0; //disable this, means query, show command use udp protocol as default
int32_t tsMaxShellConns = 50000;
......
此差异已折叠。
Subproject commit 8ce6d86558afc8c0b50c10f990fd2b4270cf06fc
Subproject commit 7a26c432f8b4203e42344ff3290b9b9b01b983d5
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册