提交 3fb5b54f 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

......@@ -10,3 +10,6 @@
[submodule "tests/examples/rust"]
path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
[submodule "deps/jemalloc"]
path = deps/jemalloc
url = https://github.com/jemalloc/jemalloc
......@@ -59,6 +59,11 @@ IF (TD_LINUX_64)
MESSAGE(STATUS "linux64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DUSE_LIBICONV)
IF (JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
ENDIF ()
ENDIF ()
IF (TD_LINUX_32)
......
......@@ -72,3 +72,8 @@ IF (${RANDOM_NETWORK_FAIL} MATCHES "true")
SET(TD_RANDOM_NETWORK_FAIL TRUE)
MESSAGE(STATUS "build with random-network-fail enabled")
ENDIF ()
IF (${JEMALLOC_ENABLED} MATCHES "true")
SET(TD_JEMALLOC_ENABLED TRUE)
MESSAGE(STATUS "build with jemalloc enabled")
ENDIF ()
......@@ -18,3 +18,16 @@ ENDIF ()
IF (TD_DARWIN AND TD_MQTT)
ADD_SUBDIRECTORY(MQTT-C)
ENDIF ()
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
MESSAGE("setup dpes/jemalloc, current source dir:" ${CMAKE_CURRENT_SOURCE_DIR})
MESSAGE("binary dir:" ${CMAKE_BINARY_DIR})
include(ExternalProject)
ExternalProject_Add(jemalloc
PREFIX "jemalloc"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
BUILD_COMMAND ${MAKE}
)
ENDIF ()
Subproject commit ea6b3e973b477b8061e0076bb257dbd7f3faa756
......@@ -107,9 +107,9 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
![replica-forward.png](page://images/architecture/replica-forward.png)
1. 应用对写请求做基本的合法性检查,通过,则给请求包打上一个版本号(version, 单调递增)
1. 应用对写请求做基本的合法性检查,通过,则给请求包打上一个版本号(version, 单调递增)
2. 应用将打上版本号的写请求封装一个WAL Head, 写入WAL(Write Ahead Log)
3. 应用调用API syncForwardToPeer,如vnode B是slave状态,sync模块将包含WAL Head的数据包通过Forward消息发送给vnode B,否则就不转发。
3. 应用调用API syncForwardToPeer,如vnode B是slave状态,sync模块将包含WAL Head的数据包通过Forward消息发送给vnode B,否则就不转发。
4. vnode B收到Forward消息后,调用回调函数writeToCache, 交给应用处理
5. vnode B应用在写入成功后,都需要调用syncAckForward通知sync模块已经写入成功。
6. 如果quorum大于1,vnode B需要等待应用的回复确认,收到确认后,vnode B发送Forward Response消息给node A。
......@@ -219,7 +219,7 @@ Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系
不同之处:
- 选举流程不一样:Raft里任何一个节点是candidate时,主动向其他节点发出vote request, 如果超过半数回答Yes, 这个candidate就成为Leader,开始一个新的term. 而TDengine的实现里,节点上线、离线或角色改变都会触发状态消息在节点组类传播,等节点组里状态稳定一致之后才触发选举流程,因为状态稳定一致,基于同样的状态信息,每个节点做出的决定会是一致的,一旦某个节点符合成为master的条件,无需其他节点认可,它会自动将自己设为master。TDengine里,任何一个节点检测到其他节点或自己的角色发生改变,就会给节点组内其他节点进行广播的。Raft里不存在这样的机制,因此需要投票来解决。
- 选举流程不一样:Raft里任何一个节点是candidate时,主动向其他节点发出vote request,如果超过半数回答Yes,这个candidate就成为Leader,开始一个新的term。而TDengine的实现里,节点上线、离线或角色改变都会触发状态消息在节点组内传播,等节点组里状态稳定一致之后才触发选举流程,因为状态稳定一致,基于同样的状态信息,每个节点做出的决定会是一致的,一旦某个节点符合成为master的条件,无需其他节点认可,它会自动将自己设为master。TDengine里,任何一个节点检测到其他节点或自己的角色发生改变,就会向节点组内其他节点进行广播。Raft里不存在这样的机制,因此需要投票来解决。
- 对WAL的一条记录,Raft用term + index来做唯一标识。但TDengine只用version(类似index),在TDengine实现里,仅仅用version是完全可行的, 因为TDengine的选举机制,没有term的概念。
如果整个虚拟节点组全部宕机,重启,但不是所有虚拟节点都上线,这个时候TDengine是不会选出master的,因为未上线的节点有可能有最高version的数据。而RAFT协议,只要超过半数上线,就会选出Leader。
......
......@@ -343,7 +343,7 @@ TDengine采用数据驱动的方式让缓存中的数据写入硬盘进行持久
对于采集的数据,一般有保留时长,这个时长由系统配置参数keep决定。超过这个设置天数的数据文件,将被系统自动删除,释放存储空间。
给定days与keep两个参数,一个vnode总的数据文件数为:keep/days。总的数据文件个数不宜过大,也不宜过小。10到100以内合适。基于这个原则,可以设置合理的days。 目前的版本,参数keep可以修改,但对于参数days,一但设置后,不可修改。
给定days与keep两个参数,一个典型工作状态的vnode中总的数据文件数为:`向上取整(keep/days)+1`。总的数据文件个数不宜过大,也不宜过小。10到100以内合适。基于这个原则,可以设置合理的days。 目前的版本,参数keep可以修改,但对于参数days,一但设置后,不可修改。
在每个数据文件里,一张表的数据是一块一块存储的。一张表可以有一到多个数据文件块。在一个文件块里,数据是列式存储的,占用的是一片连续的存储空间,这样大大提高读取速度。文件块的大小由系统参数maxRows(每块最大记录条数)决定,缺省值为4096。这个值不宜过大,也不宜过小。过大,定位具体时间段的数据的搜索时间会变长,影响读取速度;过小,数据块的索引太大,压缩效率偏低,也影响读取速度。
......
......@@ -516,7 +516,7 @@ conn.close()
- _TDengineCursor_ 类
参考python中help(taos.TDengineCursor)。
这个类对应客户端进行的写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能线程共享使用,否则会导致返回结果出现错误。
这个类对应客户端进行的写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能线程共享使用,否则会导致返回结果出现错误。
- _connect_ 方法
......
......@@ -37,7 +37,7 @@ taos> DESCRIBE meters;
- Epoch Time:时间戳也可以是一个长整数,表示从 1970-01-01 08:00:00.000 开始的毫秒数
- 时间可以加减,比如 now-2h,表明查询时刻向前推 2 个小时(最近 2 小时)。数字后面的时间单位可以是 u(微秒)、a(毫秒)、s(秒)、m(分)、h(小时)、d(天)、w(周)。 比如 `select * from t1 where ts > now-2w and ts <= now-1w`,表示查询两周前整整一周的数据。在指定降频操作(down sampling)的时间窗口(interval)时,时间单位还可以使用 n(自然月) 和 y(自然年)。
TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableMicrosecond 就可以支持微秒。
TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传递的 PRECISION 参数就可以支持微秒。
在TDengine中,普通表的数据模型中可使用以下 10 种数据类型。
......@@ -400,6 +400,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...;
```
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
说明:`(tb1_field1_name, ...)`的部分可以省略掉,这样就是使用全列模式写入——也即在 VALUES 部分提供的数据,必须为数据表的每个列都显式地提供数据。全列写入速度会远快于指定列,因此建议尽可能采用全列写入方式,此时空列可以填入NULL。
从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写:
```mysql
INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...;
......@@ -423,9 +424,9 @@ Query OK, 1 row(s) in set (0.001029s)
taos> SHOW TABLES;
Query OK, 0 row(s) in set (0.000946s)
taos> INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2);
taos> INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2) VALUES('a');
DB error: invalid SQL: keyword VALUES or FILE required
DB error: invalid SQL: 'a' (invalid timestamp) (0.039494s)
taos> SHOW TABLES;
table_name | created_time | columns | stable_name |
......
......@@ -6,7 +6,7 @@ set -e
#set -x
# release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
# -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
# -V [stable | beta]
# -l [full | lite]
......@@ -22,11 +22,12 @@ cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...]
osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
pagMode=full # [full | lite]
soMode=dynamic # [static | dynamic]
allocator=glibc # [glibc | jemalloc]
dbName=taos # [taos | power]
verNumber=""
verNumberComp="2.0.0.0"
while getopts "hv:V:c:o:l:s:d:n:m:" arg
while getopts "hv:V:c:o:l:s:d:a:n:m:" arg
do
case $arg in
v)
......@@ -53,6 +54,10 @@ do
#echo "dbName=$OPTARG"
dbName=$(echo $OPTARG)
;;
a)
#echo "allocator=$OPTARG"
allocator=$(echo $OPTARG)
;;
n)
#echo "verNumber=$OPTARG"
verNumber=$(echo $OPTARG)
......@@ -71,20 +76,21 @@ do
echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] "
echo " -V [stable | beta] "
echo " -l [full | lite] "
echo " -a [glibc | jemalloc] "
echo " -s [static | dynamic] "
echo " -d [taos | power] "
echo " -n [version number] "
echo " -m [compatible version number] "
exit 0
;;
?) #unknow option
?) #unknow option
echo "unkonw argument"
exit 1
;;
esac
done
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} verNumber=${verNumber} verNumberComp=${verNumberComp}"
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} allocator=${allocator} verNumber=${verNumber} verNumberComp=${verNumberComp}"
curr_dir=$(pwd)
......@@ -118,7 +124,7 @@ function vercomp () {
echo 0
exit 0
fi
local IFS=.
local i ver1=($1) ver2=($2)
......@@ -164,7 +170,7 @@ if [[ "$verMode" == "cluster" ]]; then
else
gitinfoOfInternal=NULL
fi
cd ${curr_dir}
# 2. cmake executable file
......@@ -180,12 +186,18 @@ else
fi
cd ${compile_dir}
if [[ "$allocator" == "jemalloc" ]]; then
allocator_macro="-DJEMALLOC_ENABLED=true"
else
allocator_macro=""
fi
# check support cpu type
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]] ; then
if [ "$verMode" != "cluster" ]; then
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode}
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} ${allocator_macro}
else
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp}
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} ${allocator_macro}
fi
else
echo "input cpuType=${cpuType} error!!!"
......@@ -199,9 +211,9 @@ cd ${curr_dir}
# 3. Call the corresponding script for packaging
if [ "$osType" != "Darwin" ]; then
if [[ "$verMode" != "cluster" ]] && [[ "$cpuType" == "x64" ]] && [[ "$dbName" == "taos" ]]; then
ret='0'
ret='0'
command -v dpkg >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
if [ "$ret" -eq 0 ]; then
echo "====do deb package for the ubuntu system===="
output_dir="${top_dir}/debs"
if [ -d ${output_dir} ]; then
......@@ -214,9 +226,9 @@ if [ "$osType" != "Darwin" ]; then
echo "==========dpkg command not exist, so not release deb package!!!"
fi
ret='0'
ret='0'
command -v rpmbuild >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
if [ "$ret" -eq 0 ]; then
echo "====do rpm package for the centos system===="
output_dir="${top_dir}/rpms"
if [ -d ${output_dir} ]; then
......@@ -229,11 +241,11 @@ if [ "$osType" != "Darwin" ]; then
echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi
fi
echo "====do tar.gz package for all systems===="
cd ${script_dir}/tools
if [[ "$dbName" == "taos" ]]; then
if [[ "$dbName" == "taos" ]]; then
${csudo} ./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
${csudo} ./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
${csudo} ./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
......
#!/bin/bash
#
# This file is used to install TAOS time-series database on linux systems. The operating system
# This file is used to install TAOS time-series database on linux systems. The operating system
# is required to use systemd to manage services at boot
set -e
# set -x
# -----------------------Variables definition---------------------
# -----------------------Variables definition
source_dir=$1
binary_dir=$2
osType=$3
......@@ -71,9 +71,9 @@ if [ "$osType" != "Darwin" ]; then
service_mod=0
elif $(which service &> /dev/null); then
service_mod=1
service_config_dir="/etc/init.d"
service_config_dir="/etc/init.d"
if $(which chkconfig &> /dev/null); then
initd_mod=1
initd_mod=1
elif $(which insserv &> /dev/null); then
initd_mod=2
elif $(which update-rc.d &> /dev/null); then
......@@ -123,9 +123,9 @@ function kill_taosd() {
function install_main_path() {
#create install main dir and all sub dir
${csudo} rm -rf ${install_main_dir} || :
${csudo} mkdir -p ${install_main_dir}
${csudo} mkdir -p ${install_main_dir}
${csudo} mkdir -p ${install_main_dir}/cfg
${csudo} mkdir -p ${install_main_dir}/bin
${csudo} mkdir -p ${install_main_dir}/bin
${csudo} mkdir -p ${install_main_dir}/connector
${csudo} mkdir -p ${install_main_dir}/driver
${csudo} mkdir -p ${install_main_dir}/examples
......@@ -176,6 +176,49 @@ function install_bin() {
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || :
fi
}
function install_jemalloc() {
if [ "$osType" != "Darwin" ]; then
/usr/bin/install -c -d /usr/local/bin
if [ -f ${binary_dir}/build/bin/jemalloc-config ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jemalloc-config /usr/local/bin
fi
if [ -f ${binary_dir}/build/bin/jemalloc.sh ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jemalloc.sh /usr/local/bin
fi
if [ -f ${binary_dir}/build/bin/jeprof ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jeprof /usr/local/bin
fi
if [ -f ${binary_dir}/build/include/jemalloc/jemalloc.h ]; then
/usr/bin/install -c -d /usr/local/include/jemalloc
/usr/bin/install -c -m 644 ${binary_dir}/build/include/jemalloc/jemalloc.h /usr/local/include/jemalloc
fi
if [ -f ${binary_dir}/build/lib/libjemalloc.so.2 ]; then
/usr/bin/install -c -d /usr/local/lib
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.so.2 /usr/local/lib
ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
/usr/bin/install -c -d /usr/local/lib
if [ -f ${binary_dir}/build/lib/libjemalloc.a ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.a /usr/local/lib
fi
if [ -f ${binary_dir}/build/lib/libjemalloc_pic.a ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc_pic.a /usr/local/lib
fi
if [ -f ${binary_dir}/build/lib/libjemalloc_pic.a ]; then
/usr/bin/install -c -d /usr/local/lib/pkgconfig
/usr/bin/install -c -m 644 ${binary_dir}/build/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig
fi
fi
if [ -f ${binary_dir}/build/share/doc/jemalloc/jemalloc.html ]; then
/usr/bin/install -c -d /usr/local/share/doc/jemalloc
/usr/bin/install -c -m 644 ${binary_dir}/build/share/doc/jemalloc/jemalloc.html /usr/local/share/doc/jemalloc
fi
if [ -f ${binary_dir}/build/share/man/man3/jemalloc.3 ]; then
/usr/bin/install -c -d /usr/local/share/man/man3
/usr/bin/install -c -m 644 ${binary_dir}/build/share/man/man3/jemalloc.3 /usr/local/share/man/man3
fi
fi
}
function install_lib() {
# Remove links
......@@ -183,12 +226,12 @@ function install_lib() {
if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${lib64_link_dir}/libtaos.* || :
fi
if [ "$osType" != "Darwin" ]; then
${csudo} cp ${binary_dir}/build/lib/libtaos.so.${verNumber} ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo} ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
if [ -d "${lib64_link_dir}" ]; then
${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1
${csudo} ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
......@@ -198,7 +241,9 @@ function install_lib() {
${csudo} ln -sf ${install_main_dir}/driver/libtaos.1.dylib ${lib_link_dir}/libtaos.1.dylib
${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
fi
install_jemalloc
if [ "$osType" != "Darwin" ]; then
${csudo} ldconfig
fi
......@@ -206,26 +251,26 @@ function install_lib() {
function install_header() {
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/*
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/*
${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
}
function install_config() {
#${csudo} rm -f ${install_main_dir}/cfg/taos.cfg || :
if [ ! -f ${cfg_install_dir}/taos.cfg ]; then
if [ ! -f ${cfg_install_dir}/taos.cfg ]; then
${csudo} mkdir -p ${cfg_install_dir}
[ -f ${script_dir}/../cfg/taos.cfg ] && ${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir}
${csudo} chmod 644 ${cfg_install_dir}/*
fi
fi
${csudo} cp -f ${script_dir}/../cfg/taos.cfg ${install_main_dir}/cfg/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
}
function install_log() {
function install_log() {
${csudo} rm -rf ${log_dir} || :
if [ "$osType" != "Darwin" ]; then
......@@ -239,7 +284,7 @@ function install_log() {
function install_data() {
${csudo} mkdir -p ${data_dir}
${csudo} ln -s ${data_dir} ${install_main_dir}/data
${csudo} ln -s ${data_dir} ${install_main_dir}/data
}
function install_connector() {
......@@ -254,8 +299,8 @@ function install_connector() {
echo "WARNING: go connector not found, please check if want to use it!"
fi
${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
}
function install_examples() {
......@@ -264,8 +309,8 @@ function install_examples() {
function clean_service_on_sysvinit() {
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
#${csudo} sed -i "\|${restart_config_str}|d" /etc/inittab || :
#${csudo} sed -i "\|${restart_config_str}|d" /etc/inittab || :
if pidof taosd &> /dev/null; then
${csudo} service taosd stop || :
fi
......@@ -277,9 +322,9 @@ function clean_service_on_sysvinit() {
elif ((${initd_mod}==3)); then
${csudo} update-rc.d -f taosd remove || :
fi
${csudo} rm -f ${service_config_dir}/taosd || :
if $(which init &> /dev/null); then
${csudo} init q || :
fi
......@@ -298,10 +343,10 @@ function install_service_on_sysvinit() {
${csudo} cp -f ${script_dir}/../rpm/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../rpm/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
fi
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
#${csudo} grep -q -F "$restart_config_str" /etc/inittab || ${csudo} bash -c "echo '${restart_config_str}' >> /etc/inittab"
if ((${initd_mod}==1)); then
${csudo} chkconfig --add taosd || :
${csudo} chkconfig --level 2345 taosd on || :
......@@ -323,7 +368,7 @@ function clean_service_on_systemd() {
${csudo} systemctl disable taosd &> /dev/null || echo &> /dev/null
${csudo} rm -f ${taosd_service_config}
}
}
# taos:2345:respawn:/etc/init.d/taosd start
......@@ -383,7 +428,7 @@ function update_TDengine() {
sleep 1
fi
fi
install_main_path
install_log
......@@ -431,16 +476,16 @@ function install_TDengine() {
# Start to install
if [ "$osType" != "Darwin" ]; then
echo -e "${GREEN}Start to install TDEngine...${NC}"
else
echo -e "${GREEN}Start to install TDEngine Client ...${NC}"
else
echo -e "${GREEN}Start to install TDEngine Client ...${NC}"
fi
install_main_path
install_main_path
if [ "$osType" != "Darwin" ]; then
if [ "$osType" != "Darwin" ]; then
install_data
fi
install_log
install_log
install_header
install_lib
install_connector
......@@ -452,7 +497,7 @@ function install_TDengine() {
install_service
fi
install_config
install_config
if [ "$osType" != "Darwin" ]; then
# Ask if to start the service
......
......@@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
......
......@@ -1286,7 +1286,7 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
}
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_VALUES) {
if (sToken.n <= 0 || (sToken.type != TK_VALUES && sToken.type != TK_LP)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......
......@@ -41,11 +41,11 @@
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
// -1 is tbname column index, so here use the -3 as the initial value
#define COLUMN_INDEX_INITIAL_VAL (-3)
// -1 is tbname column index, so here use the -2 as the initial value
#define COLUMN_INDEX_INITIAL_VAL (-2)
#define COLUMN_INDEX_INITIALIZER \
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_BLOCK_DIST_COLUMN_INDEX))
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
#define TBNAME_LIST_SEP ","
typedef struct SColumnList { // todo refactor
......@@ -90,6 +90,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm
static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t validateStateWindowNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
......@@ -851,6 +852,59 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
// The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
}
static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) {
const char* msg1 = "invalid column name";
const char* msg3 = "not support state_window with group by ";
const char* msg4 = "function not support for super table query";
SStrToken *col = &(pSqlNode->windowstateVal.col) ;
if (col->z == NULL || col->n <= 0) {
return TSDB_CODE_SUCCESS;
}
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
pQueryInfo->groupbyExpr.numOfGroupCols = 1;
//TODO(dengyihao): check tag column
if (isStable) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || index.columnIndex >= numOfCols) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
SGroupbyExpr* pGroupExpr = &pQueryInfo->groupbyExpr;
if (pGroupExpr->columnInfo == NULL) {
pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex));
}
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema);
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
pQueryInfo->stateWindow = true;
return TSDB_CODE_SUCCESS;
}
int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) {
const char* msg1 = "gap should be fixed time window";
......@@ -885,11 +939,17 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS
if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (pQueryInfo->sessionWindow.gap == 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
......@@ -1873,9 +1933,6 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema* colSchema = tGetTbnameColumnSchema();
tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, colSchema, TSDB_COL_TAG, getNewResColId(pCmd));
} else if (index.columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema colSchema = tGetBlockDistColumnSchema();
tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_TAG, getNewResColId(pCmd));
} else {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -2487,7 +2544,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = {.tableIndex = 0, .columnIndex = TSDB_BLOCK_DIST_COLUMN_INDEX,};
SColumnIndex index = {.tableIndex = 0, .columnIndex = 0,};
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema s = {.name = "block_dist", .type = TSDB_DATA_TYPE_BINARY};
......@@ -2495,10 +2552,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int16_t resType = 0;
int16_t bytes = 0;
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0);
s.bytes = bytes;
s.type = (uint8_t)resType;
SExprInfo* pExpr = tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_BLKINFO, &index, &s, TSDB_COL_TAG, getNewResColId(pCmd));
SExprInfo* pExpr = tscExprInsert(pQueryInfo, 0, TSDB_FUNC_BLKINFO, &index, resType,
bytes, getNewResColId(pCmd), bytes, 0);
tstrncpy(pExpr->base.aliasName, s.name, sizeof(pExpr->base.aliasName));
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
insertResultField(pQueryInfo, 0, &ids, bytes, s.type, s.name, pExpr);
pExpr->base.numOfParams = 1;
pExpr->base.param[0].i64 = pTableMetaInfo->pTableMeta->tableInfo.rowSize;
pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT;
......@@ -2545,14 +2608,6 @@ static bool isTablenameToken(SStrToken* token) {
return (strncasecmp(TSQL_TBNAME_L, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_TBNAME_L));
}
static bool isTableBlockDistToken(SStrToken* token) {
SStrToken tmpToken = *token;
SStrToken tableToken = {0};
extractTableNameFromToken(&tmpToken, &tableToken);
return (strncasecmp(TSQL_BLOCK_DIST, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_BLOCK_DIST_L));
}
static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken* pToken) {
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, index)->pTableMeta;
......@@ -2582,8 +2637,6 @@ int32_t doGetColumnIndexByName(SSqlCmd* pCmd, SStrToken* pToken, SQueryInfo* pQu
if (isTablenameToken(pToken)) {
pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX;
} else if (isTableBlockDistToken(pToken)) {
pIndex->columnIndex = TSDB_BLOCK_DIST_COLUMN_INDEX;
} else if (strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) {
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else {
......@@ -2903,6 +2956,9 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
return true;
}
}
} else if (tscIsSessionWindowQuery(pQueryInfo)) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return true;
}
return false;
......@@ -6163,7 +6219,7 @@ static int32_t doTagFunctionCheck(SQueryInfo* pQueryInfo) {
int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "functions/columns not allowed in group by query";
const char* msg2 = "projection query on columns not allowed";
const char* msg3 = "group by not allowed on projection query";
const char* msg3 = "group by/session/state_window not allowed on projection query";
const char* msg4 = "retrieve tags not compatible with group by or interval query";
const char* msg5 = "functions can not be mixed up";
......@@ -6179,6 +6235,9 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
}
if (tscIsProjectionQuery(pQueryInfo) && tscIsSessionWindowQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
// check if all the tags prj columns belongs to the group by columns
......@@ -6748,6 +6807,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -7551,7 +7611,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// parse the window_state
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// set order by info
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) !=
TSDB_CODE_SUCCESS) {
......@@ -7592,6 +7655,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
* transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last
*/
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (isSTable) {
tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
......@@ -7599,10 +7666,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
}
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// no result due to invalid query time range
if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
......
......@@ -856,6 +856,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->stateWindow = query.stateWindow;
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->sqlstrLen = htonl(sqlLen);
......
......@@ -434,6 +434,9 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
return false;
}
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->sessionWindow.gap > 0;
}
bool tscNeedReverseScan(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
......@@ -527,7 +530,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
bool isBlockDistQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
return (numOfExprs == 1 && pExpr->base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
return (numOfExprs == 1 && pExpr->base.functionId == TSDB_FUNC_BLKINFO);
}
void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
......@@ -2073,16 +2076,14 @@ SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde
p->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
p->colBytes = s->bytes;
p->colType = s->type;
} else if (pColIndex->columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
p->colInfo.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
p->colBytes = s.bytes;
p->colType = s.type;
} else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX) {
p->colInfo.colId = pColIndex->columnIndex;
p->colBytes = size;
p->colType = type;
} else if (functionId == TSDB_FUNC_BLKINFO) {
p->colInfo.colId = pColIndex->columnIndex;
p->colBytes = TSDB_MAX_BINARY_LEN;
p->colType = TSDB_DATA_TYPE_BINARY;
} else {
if (TSDB_COL_IS_TAG(colType)) {
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
......@@ -2578,7 +2579,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
return false;
}
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId == TSDB_BLOCK_DIST_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
if (colId == TSDB_TBNAME_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
return true;
}
......@@ -4238,11 +4239,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->distinctTag = pQueryInfo->distinctTag;
pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
......@@ -4250,8 +4253,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->slimit = pQueryInfo->slimit;
pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window;
......
......@@ -92,8 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
SSchema tGetBlockDistColumnSchema();
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
bool tscValidateTableNameLength(size_t len);
......
......@@ -33,15 +33,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len) {
return strlen(prefix);
}
SSchema tGetBlockDistColumnSchema() {
SSchema s = {0};
s.bytes = TSDB_MAX_BINARY_LEN;;
s.type = TSDB_DATA_TYPE_BINARY;
s.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
tstrncpy(s.name, TSQL_BLOCK_DIST_L, TSDB_COL_NAME_LEN);
return s;
}
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name) {
SSchema s = {0};
......
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
Subproject commit 8ce6d86558afc8c0b50c10f990fd2b4270cf06fc
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
......@@ -10,8 +10,15 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
ELSE ()
SET(LINK_JEMALLOC "")
ENDIF ()
ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync)
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync ${LINK_JEMALLOC})
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosd taos_static)
......
......@@ -244,7 +244,6 @@ do { \
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
......
......@@ -473,6 +473,7 @@ typedef struct {
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan
bool stateWindow; // state window flag
STimeWindow window;
int32_t numOfTables;
......
......@@ -215,7 +215,7 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t numOfRows;
int32_t numBlocksOfStep;
} SFileBlockInfo;
typedef struct {
......@@ -229,11 +229,15 @@ typedef struct {
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
SArray *dataBlockInfos;
......
......@@ -136,74 +136,74 @@
#define TK_VARIABLE 117
#define TK_INTERVAL 118
#define TK_SESSION 119
#define TK_FILL 120
#define TK_SLIDING 121
#define TK_ORDER 122
#define TK_BY 123
#define TK_ASC 124
#define TK_DESC 125
#define TK_GROUP 126
#define TK_HAVING 127
#define TK_LIMIT 128
#define TK_OFFSET 129
#define TK_SLIMIT 130
#define TK_SOFFSET 131
#define TK_WHERE 132
#define TK_NOW 133
#define TK_RESET 134
#define TK_QUERY 135
#define TK_SYNCDB 136
#define TK_ADD 137
#define TK_COLUMN 138
#define TK_TAG 139
#define TK_CHANGE 140
#define TK_SET 141
#define TK_KILL 142
#define TK_CONNECTION 143
#define TK_STREAM 144
#define TK_COLON 145
#define TK_ABORT 146
#define TK_AFTER 147
#define TK_ATTACH 148
#define TK_BEFORE 149
#define TK_BEGIN 150
#define TK_CASCADE 151
#define TK_CLUSTER 152
#define TK_CONFLICT 153
#define TK_COPY 154
#define TK_DEFERRED 155
#define TK_DELIMITERS 156
#define TK_DETACH 157
#define TK_EACH 158
#define TK_END 159
#define TK_EXPLAIN 160
#define TK_FAIL 161
#define TK_FOR 162
#define TK_IGNORE 163
#define TK_IMMEDIATE 164
#define TK_INITIALLY 165
#define TK_INSTEAD 166
#define TK_MATCH 167
#define TK_KEY 168
#define TK_OF 169
#define TK_RAISE 170
#define TK_REPLACE 171
#define TK_RESTRICT 172
#define TK_ROW 173
#define TK_STATEMENT 174
#define TK_TRIGGER 175
#define TK_VIEW 176
#define TK_SEMI 177
#define TK_NONE 178
#define TK_PREV 179
#define TK_LINEAR 180
#define TK_IMPORT 181
#define TK_TBNAME 182
#define TK_JOIN 183
#define TK_INSERT 184
#define TK_INTO 185
#define TK_VALUES 186
#define TK_STATE_WINDOW 120
#define TK_FILL 121
#define TK_SLIDING 122
#define TK_ORDER 123
#define TK_BY 124
#define TK_ASC 125
#define TK_DESC 126
#define TK_GROUP 127
#define TK_HAVING 128
#define TK_LIMIT 129
#define TK_OFFSET 130
#define TK_SLIMIT 131
#define TK_SOFFSET 132
#define TK_WHERE 133
#define TK_NOW 134
#define TK_RESET 135
#define TK_QUERY 136
#define TK_SYNCDB 137
#define TK_ADD 138
#define TK_COLUMN 139
#define TK_TAG 140
#define TK_CHANGE 141
#define TK_SET 142
#define TK_KILL 143
#define TK_CONNECTION 144
#define TK_STREAM 145
#define TK_COLON 146
#define TK_ABORT 147
#define TK_AFTER 148
#define TK_ATTACH 149
#define TK_BEFORE 150
#define TK_BEGIN 151
#define TK_CASCADE 152
#define TK_CLUSTER 153
#define TK_CONFLICT 154
#define TK_COPY 155
#define TK_DEFERRED 156
#define TK_DELIMITERS 157
#define TK_DETACH 158
#define TK_EACH 159
#define TK_END 160
#define TK_EXPLAIN 161
#define TK_FAIL 162
#define TK_FOR 163
#define TK_IGNORE 164
#define TK_IMMEDIATE 165
#define TK_INITIALLY 166
#define TK_INSTEAD 167
#define TK_MATCH 168
#define TK_KEY 169
#define TK_OF 170
#define TK_RAISE 171
#define TK_REPLACE 172
#define TK_RESTRICT 173
#define TK_ROW 174
#define TK_STATEMENT 175
#define TK_TRIGGER 176
#define TK_VIEW 177
#define TK_SEMI 178
#define TK_NONE 179
#define TK_PREV 180
#define TK_LINEAR 181
#define TK_IMPORT 182
#define TK_TBNAME 183
#define TK_JOIN 184
#define TK_INSERT 185
#define TK_INTO 186
#define TK_VALUES 187
#define TK_SPACE 300
#define TK_COMMENT 301
......
......@@ -11,10 +11,17 @@ IF (TD_LINUX)
LIST(REMOVE_ITEM SRC ./src/shellDarwin.c)
ADD_EXECUTABLE(shell ${SRC})
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
ELSE ()
SET(LINK_JEMALLOC "")
ENDIF ()
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(shell taos_static)
TARGET_LINK_LIBRARIES(shell taos_static ${LINK_JEMALLOC})
ELSE ()
TARGET_LINK_LIBRARIES(shell taos)
TARGET_LINK_LIBRARIES(shell taos ${LINK_JEMALLOC})
ENDIF ()
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
......
......@@ -55,14 +55,21 @@ ENDIF ()
MESSAGE("TD_VERSION_NUMBER is:" ${TD_VERSION_NUMBER})
ADD_DEFINITIONS(-DTD_VERNUMBER="${TD_VERSION_NUMBER}")
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
ELSE ()
SET(LINK_JEMALLOC "")
ENDIF ()
IF (TD_LINUX)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taosdemo ${SRC})
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson ${LINK_JEMALLOC})
ELSE ()
TARGET_LINK_LIBRARIES(taosdemo taos cJson)
TARGET_LINK_LIBRARIES(taosdemo taos cJson ${LINK_JEMALLOC})
ENDIF ()
ELSEIF (TD_WINDOWS)
AUX_SOURCE_DIRECTORY(. SRC)
......@@ -71,7 +78,7 @@ ELSEIF (TD_WINDOWS)
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson)
ELSE ()
TARGET_LINK_LIBRARIES(taosdemo taos cJson})
TARGET_LINK_LIBRARIES(taosdemo taos cJson)
ENDIF ()
ELSEIF (TD_DARWIN)
# missing a few dependencies, such as <argp.h>
......
......@@ -1060,6 +1060,13 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
newCfg.partitions = partitions;
}
// community version can only change daysToKeep
// but enterprise version can change all daysToKeep options
#ifndef _STORAGE
newCfg.daysToKeep1 = newCfg.daysToKeep;
newCfg.daysToKeep2 = newCfg.daysToKeep;
#endif
return newCfg;
}
......
......@@ -22,6 +22,10 @@
extern "C" {
#endif
#ifdef TD_JEMALLOC_ENABLED
#include <jemalloc/jemalloc.h>
#endif
typedef enum {
TAOS_ALLOC_MODE_DEFAULT = 0,
TAOS_ALLOC_MODE_RANDOM_FAIL = 1,
......
......@@ -189,6 +189,7 @@ typedef struct SQueryAttr {
bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan
bool distinctTag; // distinct tag query
bool stateWindow; // window State on sub/normal table
int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
......@@ -296,6 +297,7 @@ enum OPERATOR_TYPE_E {
OP_Filter = 19,
OP_Distinct = 20,
OP_Join = 21,
OP_StateWindow = 22,
};
typedef struct SOperatorInfo {
......@@ -460,6 +462,16 @@ typedef struct SSWindowOperatorInfo {
int32_t start; // start row index
} SSWindowOperatorInfo;
typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window
int32_t numOfRows; // number of rows
int32_t colIndex; // start row index
int32_t start;
char* prevData; // previous data
} SStateWindowOperatorInfo ;
typedef struct SDistinctOperatorInfo {
SHashObj *pSet;
SSDataBlock *pRes;
......@@ -509,6 +521,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger, bool groupMix);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
......
......@@ -89,6 +89,10 @@ typedef struct SSessionWindowVal {
SStrToken gap;
} SSessionWindowVal;
typedef struct SWindowStateVal {
SStrToken col;
} SWindowStateVal;
struct SRelationInfo;
typedef struct SSqlNode {
......@@ -100,6 +104,7 @@ typedef struct SSqlNode {
SArray *fillType; // fill type[optional], SArray<tVariantListItem>
SIntervalVal interval; // (interval, interval_offset) [optional]
SSessionWindowVal sessionVal; // session window [optional]
SWindowStateVal windowstateVal; // window_state(col) [optional]
SStrToken sliding; // sliding window [optional]
SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional]
......@@ -275,7 +280,7 @@ SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinc
void tSqlExprListDestroy(SArray *pList);
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SWindowStateVal *pw,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving);
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right);
......
......@@ -138,6 +138,7 @@ typedef struct SQueryInfo {
bool hasFilter;
bool onlyTagQuery;
bool orderProjectQuery;
bool stateWindow;
} SQueryInfo;
/**
......
......@@ -456,8 +456,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
//////////////////////// The SELECT statement /////////////////////////////////
%type select {SSqlNode*}
%destructor select {destroySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N);
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N);
}
select(A) ::= LP select(B) RP. {A = B;}
......@@ -475,7 +475,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// select client_version()
// select server_state()
select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
}
// selcollist is a list of expressions that are to become the return
......@@ -558,6 +558,11 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
X.col = V;
X.gap = Y;
}
%type windowstate_option {SWindowStateVal}
windowstate_option(X) ::= . {X.col.n = 0;}
windowstate_option(X) ::= STATE_WINDOW LP ids(V) RP. {
X.col = V;
}
%type fill_opt {SArray*}
%destructor fill_opt {taosArrayDestroy($$);}
......
......@@ -19,6 +19,7 @@
#include "texpr.h"
#include "ttype.h"
#include "tsdb.h"
#include "tglobal.h"
#include "qAggMain.h"
#include "qFill.h"
......@@ -3297,8 +3298,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
if (pCtx->numOfParams == 2) {
return;
}
if (pCtx->param[0].i64 == 1) {
SET_VAL(pCtx, pCtx->size, 1);
} else {
INC_INIT_VAL(pCtx, pCtx->size);
}
INC_INIT_VAL(pCtx, pCtx->size);
char *pData = GET_INPUT_DATA_LIST(pCtx);
if (pCtx->order == TSDB_ORDER_ASC) {
......@@ -4827,51 +4832,81 @@ void blockInfo_func(SQLFunctionCtx* pCtx) {
pResInfo->hasResult = DATA_SET_FLAG;
}
static void mergeTableBlockDist(STableBlockDist* pDist, const STableBlockDist* pSrc) {
static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockDist* pSrc) {
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
assert(pDist != NULL && pSrc != NULL);
pDist->numOfTables += pSrc->numOfTables;
pDist->numOfRowsInMemTable += pSrc->numOfRowsInMemTable;
pDist->numOfFiles += pSrc->numOfFiles;
pDist->totalSize += pSrc->totalSize;
pDist->totalRows += pSrc->totalRows;
if (pDist->dataBlockInfos == NULL) {
pDist->dataBlockInfos = taosArrayInit(4, sizeof(SFileBlockInfo));
if (pResInfo->hasResult == DATA_SET_FLAG) {
pDist->maxRows = MAX(pDist->maxRows, pSrc->maxRows);
pDist->minRows = MIN(pDist->minRows, pSrc->minRows);
} else {
pDist->maxRows = pSrc->maxRows;
pDist->minRows = pSrc->minRows;
int32_t numSteps = tsMaxRowsInFileBlock/TSDB_BLOCK_DIST_STEP_ROWS;
pDist->dataBlockInfos = taosArrayInit(numSteps, sizeof(SFileBlockInfo));
taosArraySetSize(pDist->dataBlockInfos, numSteps);
}
taosArrayAddBatch(pDist->dataBlockInfos, pSrc->dataBlockInfos->pData, (int32_t) taosArrayGetSize(pSrc->dataBlockInfos));
size_t steps = taosArrayGetSize(pDist->dataBlockInfos);
for (int32_t i = 0; i < steps; ++i) {
int32_t srcNumBlocks = ((SFileBlockInfo*)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep;
SFileBlockInfo* blockInfo = (SFileBlockInfo*)taosArrayGet(pDist->dataBlockInfos, i);
blockInfo->numBlocksOfStep += srcNumBlocks;
}
}
void block_func_merge(SQLFunctionCtx* pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
STableBlockDist info = {0};
int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info);
mergeTableBlockDist(pDist, &info);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
mergeTableBlockDist(pResInfo, &info);
pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG;
}
static int32_t doGetPercentile(const SArray* pArray, double rate) {
int32_t len = (int32_t)taosArrayGetSize(pArray);
if (len <= 0) {
return 0;
void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents,
double* percents, int32_t* percentiles) {
if (totalBlocks == 0) {
for (int32_t i = 0; i < numOfPercents; ++i) {
percentiles[i] = 0;
}
return;
}
assert(rate >= 0 && rate <= 1.0);
int idx = (int32_t)((len - 1) * rate);
SArray *blocksInfos = pTableBlockDist->dataBlockInfos;
size_t numSteps = taosArrayGetSize(blocksInfos);
size_t cumulativeBlocks = 0;
return ((SFileBlockInfo *)(taosArrayGet(pArray, idx)))->numOfRows;
}
int percentIndex = 0;
for (int32_t indexStep = 0; indexStep < numSteps; ++indexStep) {
int32_t numStepBlocks = ((SFileBlockInfo *)taosArrayGet(blocksInfos, indexStep))->numBlocksOfStep;
if (numStepBlocks == 0) continue;
cumulativeBlocks += numStepBlocks;
static int compareBlockInfo(const void *pLeft, const void *pRight) {
int32_t left = ((SFileBlockInfo *)pLeft)->numOfRows;
int32_t right = ((SFileBlockInfo *)pRight)->numOfRows;
while (percentIndex < numOfPercents) {
double blockRank = totalBlocks * percents[percentIndex];
if (blockRank <= cumulativeBlocks) {
percentiles[percentIndex] = indexStep;
++percentIndex;
} else {
break;
}
}
}
if (left > right) return 1;
if (left < right) return -1;
return 0;
for (int32_t i = 0; i < numOfPercents; ++i) {
percentiles[i] = (percentiles[i]+1) * TSDB_BLOCK_DIST_STEP_ROWS - TSDB_BLOCK_DIST_STEP_ROWS/2;
}
}
void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
......@@ -4879,40 +4914,41 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
return;
}
int64_t min = INT64_MAX, max = INT64_MIN, avg = 0;
SArray* blockInfos= pTableBlockDist->dataBlockInfos;
int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos);
SArray* blockInfos = pTableBlockDist->dataBlockInfos;
uint64_t totalRows = pTableBlockDist->totalRows;
size_t numSteps = taosArrayGetSize(blockInfos);
int64_t totalBlocks = 0;
int64_t min = -1, max = -1, avg = 0;
for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) {
for (int32_t i = 0; i < numSteps; i++) {
SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t rows = blockInfo->numOfRows;
min = MIN(min, rows);
max = MAX(max, rows);
totalRows += rows;
int64_t blocks = blockInfo->numBlocksOfStep;
totalBlocks += blocks;
}
avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0;
taosArraySort(blockInfos, compareBlockInfo);
min = totalBlocks > 0 ? pTableBlockDist->minRows : 0;
max = totalBlocks > 0 ? pTableBlockDist->maxRows : 0;
double percents[] = {0.05, 0.10, 0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 0.95, 0.99};
int32_t percentiles[] = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
assert(sizeof(percents)/sizeof(double) == sizeof(percentiles)/sizeof(int32_t));
getPercentiles(pTableBlockDist, totalBlocks, sizeof(percents)/sizeof(double), percents, percentiles);
uint64_t totalLen = pTableBlockDist->totalSize;
int32_t rowSize = pTableBlockDist->rowSize;
double compRatio = (totalRows>0) ? ((double)(totalLen)/(rowSize*totalRows)) : 1;
int sz = sprintf(result + VARSTR_HEADER_SIZE,
"summary: \n\t "
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t "
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t "
"Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t "
"Rows=[%"PRId64"], Blocks=[%"PRId64"], Size=[%.3f(Kb)] Comp=[%.2f%%]\n\t "
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], Size=[%.3f(Kb)] Comp=[%.2f]\n\t "
"RowsInMem=[%d] \n\t SeekHeaderTime=[%d(us)]",
doGetPercentile(blockInfos, 0.05), doGetPercentile(blockInfos, 0.10),
doGetPercentile(blockInfos, 0.20), doGetPercentile(blockInfos, 0.30),
doGetPercentile(blockInfos, 0.40), doGetPercentile(blockInfos, 0.50),
doGetPercentile(blockInfos, 0.60), doGetPercentile(blockInfos, 0.70),
doGetPercentile(blockInfos, 0.80), doGetPercentile(blockInfos, 0.90),
doGetPercentile(blockInfos, 0.95), doGetPercentile(blockInfos, 0.99),
percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5],
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
min, max, avg, 0.0,
totalRows, totalBlocks, totalLen/1024.0, (double)(totalLen*100.0)/(rowSize*totalRows),
totalRows, totalBlocks, totalLen/1024.0, compRatio,
pTableBlockDist->numOfRowsInMemTable, pTableBlockDist->firstSeekTimeUs);
varDataSetLen(result, sz);
UNUSED(sz);
......
此差异已折叠。
......@@ -592,6 +592,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
op = OP_SessionWindow;
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->stateWindow) {
op = OP_StateWindow;
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
......
......@@ -726,9 +726,9 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
* extract the select info out of sql string
*/
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *psLimit,
tSqlExpr *pHaving) {
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SSessionWindowVal *pSession, SWindowStateVal *pWindowStateVal, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
SLimitVal *psLimit, tSqlExpr *pHaving) {
assert(pSelNodeList != NULL);
SSqlNode *pSqlNode = calloc(1, sizeof(SSqlNode));
......@@ -779,6 +779,12 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat
TPARSER_SET_NONE_TOKEN(pSqlNode->sessionVal.col);
}
if (pWindowStateVal != NULL) {
pSqlNode->windowstateVal = *pWindowStateVal;
} else {
TPARSER_SET_NONE_TOKEN(pSqlNode->windowstateVal.col);
}
return pSqlNode;
}
......
......@@ -581,6 +581,9 @@ void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
tbufWriteUint32(bw, pDist->numOfTables);
tbufWriteUint16(bw, pDist->numOfFiles);
tbufWriteUint64(bw, pDist->totalSize);
tbufWriteUint64(bw, pDist->totalRows);
tbufWriteInt32(bw, pDist->maxRows);
tbufWriteInt32(bw, pDist->minRows);
tbufWriteUint32(bw, pDist->numOfRowsInMemTable);
tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
......@@ -616,13 +619,16 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
pDist->numOfTables = tbufReadUint32(&br);
pDist->numOfFiles = tbufReadUint16(&br);
pDist->totalSize = tbufReadUint64(&br);
pDist->totalRows = tbufReadUint64(&br);
pDist->maxRows = tbufReadInt32(&br);
pDist->minRows = tbufReadInt32(&br);
pDist->numOfRowsInMemTable = tbufReadUint32(&br);
int64_t numOfBlocks = tbufReadUint64(&br);
int64_t numSteps = tbufReadUint64(&br);
bool comp = tbufReadUint8(&br);
uint32_t compLen = tbufReadUint32(&br);
size_t originalLen = (size_t) (numOfBlocks*sizeof(SFileBlockInfo));
size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo));
char* outputBuf = NULL;
if (comp) {
......@@ -633,12 +639,12 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf,
(int32_t)originalLen , ONE_STAGE_COMP, NULL, 0);
assert(orignalLen == numOfBlocks*sizeof(SFileBlockInfo));
assert(orignalLen == numSteps *sizeof(SFileBlockInfo));
} else {
outputBuf = (char*) tbufReadBinary(&br, &originalLen);
}
pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t) numOfBlocks, sizeof(SFileBlockInfo));
pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo));
if (comp) {
tfree(outputBuf);
}
......
此差异已折叠。
......@@ -2128,6 +2128,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
pTableBlockInfo->totalSize = 0;
pTableBlockInfo->totalRows = 0;
STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb);
// find the start data block in file
......@@ -2201,7 +2202,12 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
pTableBlockInfo->totalSize += pBlock[j].len;
int32_t numOfRows = pBlock[j].numOfRows;
taosArrayPush(pTableBlockInfo->dataBlockInfos, &numOfRows);
pTableBlockInfo->totalRows += numOfRows;
if (numOfRows > pTableBlockInfo->maxRows) pTableBlockInfo->maxRows = numOfRows;
if (numOfRows < pTableBlockInfo->minRows) pTableBlockInfo->minRows = numOfRows;
int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
blockInfo->numBlocksOfStep++;
}
}
}
......
......@@ -106,6 +106,14 @@ void* taosArrayGetLast(const SArray* pArray);
*/
size_t taosArrayGetSize(const SArray* pArray);
/**
* set the size of array
* @param pArray
* @param size size of the array
* @return
*/
void taosArraySetSize(SArray* pArray, size_t size);
/**
* insert data into array
* @param pArray
......
......@@ -115,6 +115,11 @@ void* taosArrayGetLast(const SArray* pArray) {
size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; }
void taosArraySetSize(SArray* pArray, size_t size) {
assert(size <= pArray->capacity);
pArray->size = size;
}
void* taosArrayInsert(SArray* pArray, size_t index, void* pData) {
if (pArray == NULL || pData == NULL) {
return NULL;
......
......@@ -141,6 +141,7 @@ static SKeyword keywordTable[] = {
{"VARIABLE", TK_VARIABLE},
{"INTERVAL", TK_INTERVAL},
{"SESSION", TK_SESSION},
{"STATE_WINDOW", TK_STATE_WINDOW},
{"FILL", TK_FILL},
{"SLIDING", TK_SLIDING},
{"ORDER", TK_ORDER},
......
......@@ -29,8 +29,8 @@ pipeline {
agent none
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community'
WK = '/data/lib/jenkins/workspace/TDinternal'
WKC= '/data/lib/jenkins/workspace/TDinternal/community'
}
stages {
......
此差异已折叠。
......@@ -6,9 +6,9 @@ events {
}
http {
lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;/blah/?.lua;;';
lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;$prefix/rest/?/init.lua;;';
lua_package_cpath "$prefix/so/?.so;;";
lua_code_cache off;
lua_code_cache on;
server {
listen 7000;
server_name restapi;
......
local config = {
host = "127.0.0.1",
port = 6030,
database = "",
user = "root",
password = "taosdata",
max_packet_size = 1024 * 1024 ,
connection_pool_size = 64
}
return config
local _M = {}
local driver = require "luaconnector51"
local water_mark = 0
local occupied = 0
local connection_pool = {}
function _M.new(o,config)
o = o or {}
o.connection_pool = connection_pool
o.water_mark = water_mark
o.occupied = occupied
if #connection_pool == 0 then
for i = 1, config.connection_pool_size do
local res = driver.connect(config)
if res.code ~= 0 then
ngx.log(ngx.ERR, "connect--- failed:"..res.error)
return nil
else
local object = {obj = res.conn, state = 0}
table.insert(o.connection_pool,i, object)
ngx.log(ngx.INFO, "add connection, now pool size:"..#(o.connection_pool))
end
end
end
return setmetatable(o, { __index = _M })
end
function _M:get_connection()
local connection_obj
for i = 1, #connection_pool do
connection_obj = connection_pool[i]
if connection_obj.state == 0 then
connection_obj.state = 1
occupied = occupied +1
if occupied > water_mark then
water_mark = occupied
end
return connection_obj["obj"]
end
end
ngx.log(ngx.ERR,"ALERT! NO FREE CONNECTION.")
return nil
end
function _M:get_water_mark()
return water_mark
end
function _M:release_connection(conn)
local connection_obj
for i = 1, #connection_pool do
connection_obj = connection_pool[i]
if connection_obj["obj"] == conn then
connection_obj["state"] = 0
occupied = occupied -1
return
end
end
end
return _M
local driver = require "luaconnector51"
local cjson = require "cjson"
local Pool = require "tdpool"
local config = require "config"
ngx.say("start time:"..os.time())
local config = {
host = "127.0.0.1",
port = 6030,
database = "",
user = "root",
password = "taosdata",
max_packet_size = 1024 * 1024
}
local conn
local res = driver.connect(config)
if res.code ~=0 then
ngx.say("connect--- failed: "..res.error)
return
else
conn = res.conn
ngx.say("connect--- pass.")
end
local pool = Pool.new(Pool,config)
local conn = pool:get_connection()
local res = driver.query(conn,"drop database if exists nginx")
if res.code ~=0 then
......@@ -51,7 +36,7 @@ else
ngx.say("create table--- pass.")
end
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 0, 'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
if res.code ~=0 then
ngx.say("insert records failed: "..res.error)
return
......@@ -77,7 +62,29 @@ else
end
end
driver.close(conn)
ngx.say("end time:"..os.time())
--ngx.log(ngx.ERR,"in test file.")
local flag = false
function query_callback(res)
if res.code ~=0 then
ngx.say("async_query_callback--- failed:"..res.error)
else
if(res.affected == 3) then
ngx.say("async_query_callback, insert records--- pass")
else
ngx.say("async_query_callback, insert records---failed: expect 3 affected records, actually affected "..res.affected)
end
end
flag = true
end
driver.query_a(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 3, 'robotspace'),('2019-09-01 00:00:00.006', 4, 'Hilink'),('2019-09-01 00:00:00.007', 6, 'Harmony')", query_callback)
while not flag do
-- ngx.say("i am here once...")
ngx.sleep(0.001) -- time unit is second
end
ngx.say("pool water_mark:"..pool:get_water_mark())
pool:release_connection(conn)
ngx.say("end time:"..os.time())
gcc -std=c99 lua_connector.c -fPIC -shared -o luaconnector.so -Wall -ltaos
lua_header_installed=`apt-cache policy liblua5.3-dev|grep Installed|grep none > /dev/null; echo $?`
if [ "$lua_header_installed" = "0" ]; then
echo "If need, please input root password to install liblua5.3-dev for build the connector.."
sudo apt install -y liblua5.3-dev
fi
gcc -std=c99 lua_connector.c -fPIC -shared -o luaconnector.so -Wall -ltaos -I/usr/include/lua5.3
此差异已折叠。
......@@ -110,7 +110,25 @@ else
end
end
function callback(t)
function async_query_callback(res)
if res.code ~=0 then
print("async_query_callback--- failed:"..res.error)
return
else
if(res.affected == 3) then
print("async_query_callback, insert records--- pass")
else
print("async_query_callback, insert records---failed: expect 3 affected records, actually affected "..res.affected)
end
end
end
driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
function stream_callback(t)
print("------------------------")
print("continuous query result:")
for key, value in pairs(t) do
......@@ -119,7 +137,7 @@ function callback(t)
end
local stream
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback)
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0, stream_callback)
if res.code ~=0 then
print("open stream--- failed:"..res.error)
return
......@@ -146,4 +164,5 @@ while loop_index < 30 do
end
driver.close_stream(stream)
driver.close(conn)
......@@ -29,8 +29,8 @@ pipeline {
agent none
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community'
WK = '/data/lib/jenkins/workspace/TDinternal'
WKC= '/data/lib/jenkins/workspace/TDinternal/community'
}
stages {
......
此差异已折叠。
......@@ -334,5 +334,6 @@ python3 ./test.py -f tag_lite/alter_tag.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py
python3 test.py -f insert/insert_before_use_db.py
#======================p4-end===============
......@@ -82,6 +82,8 @@ class TDTestCase:
tdSql.execute("import into tbx file \'%s\'"%(self.csvfile))
tdSql.query('select * from tbx')
tdSql.checkRows(self.rows)
#TD-4447 import the same csv twice
tdSql.execute("import into tbx file \'%s\'"%(self.csvfile))
def stop(self):
self.destroyCSVFile()
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -14,3 +14,4 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册