提交 56d77cc3 编写于 作者: S Shengliang Guan

Merge from master

...@@ -10,3 +10,6 @@ ...@@ -10,3 +10,6 @@
[submodule "tests/examples/rust"] [submodule "tests/examples/rust"]
path = tests/examples/rust path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git url = https://github.com/songtianyi/tdengine-rust-bindings.git
[submodule "deps/jemalloc"]
path = deps/jemalloc
url = https://github.com/jemalloc/jemalloc
...@@ -59,6 +59,11 @@ IF (TD_LINUX_64) ...@@ -59,6 +59,11 @@ IF (TD_LINUX_64)
MESSAGE(STATUS "linux64 is defined") MESSAGE(STATUS "linux64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DUSE_LIBICONV) ADD_DEFINITIONS(-DUSE_LIBICONV)
IF (JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
ENDIF ()
ENDIF () ENDIF ()
IF (TD_LINUX_32) IF (TD_LINUX_32)
......
...@@ -72,3 +72,8 @@ IF (${RANDOM_NETWORK_FAIL} MATCHES "true") ...@@ -72,3 +72,8 @@ IF (${RANDOM_NETWORK_FAIL} MATCHES "true")
SET(TD_RANDOM_NETWORK_FAIL TRUE) SET(TD_RANDOM_NETWORK_FAIL TRUE)
MESSAGE(STATUS "build with random-network-fail enabled") MESSAGE(STATUS "build with random-network-fail enabled")
ENDIF () ENDIF ()
IF (${JEMALLOC_ENABLED} MATCHES "true")
SET(TD_JEMALLOC_ENABLED TRUE)
MESSAGE(STATUS "build with jemalloc enabled")
ENDIF ()
...@@ -18,3 +18,16 @@ ENDIF () ...@@ -18,3 +18,16 @@ ENDIF ()
IF (TD_DARWIN AND TD_MQTT) IF (TD_DARWIN AND TD_MQTT)
ADD_SUBDIRECTORY(MQTT-C) ADD_SUBDIRECTORY(MQTT-C)
ENDIF () ENDIF ()
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
MESSAGE("setup dpes/jemalloc, current source dir:" ${CMAKE_CURRENT_SOURCE_DIR})
MESSAGE("binary dir:" ${CMAKE_BINARY_DIR})
include(ExternalProject)
ExternalProject_Add(jemalloc
PREFIX "jemalloc"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
BUILD_COMMAND ${MAKE}
)
ENDIF ()
Subproject commit ea6b3e973b477b8061e0076bb257dbd7f3faa756
...@@ -6,7 +6,7 @@ set -e ...@@ -6,7 +6,7 @@ set -e
#set -x #set -x
# release.sh -v [cluster | edge] # release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...] # -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
# -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] # -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
# -V [stable | beta] # -V [stable | beta]
# -l [full | lite] # -l [full | lite]
...@@ -22,11 +22,12 @@ cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...] ...@@ -22,11 +22,12 @@ cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...]
osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
pagMode=full # [full | lite] pagMode=full # [full | lite]
soMode=dynamic # [static | dynamic] soMode=dynamic # [static | dynamic]
allocator=glibc # [glibc | jemalloc]
dbName=taos # [taos | power] dbName=taos # [taos | power]
verNumber="" verNumber=""
verNumberComp="2.0.0.0" verNumberComp="2.0.0.0"
while getopts "hv:V:c:o:l:s:d:n:m:" arg while getopts "hv:V:c:o:l:s:d:a:n:m:" arg
do do
case $arg in case $arg in
v) v)
...@@ -53,6 +54,10 @@ do ...@@ -53,6 +54,10 @@ do
#echo "dbName=$OPTARG" #echo "dbName=$OPTARG"
dbName=$(echo $OPTARG) dbName=$(echo $OPTARG)
;; ;;
a)
#echo "allocator=$OPTARG"
allocator=$(echo $OPTARG)
;;
n) n)
#echo "verNumber=$OPTARG" #echo "verNumber=$OPTARG"
verNumber=$(echo $OPTARG) verNumber=$(echo $OPTARG)
...@@ -71,20 +76,21 @@ do ...@@ -71,20 +76,21 @@ do
echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] " echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] "
echo " -V [stable | beta] " echo " -V [stable | beta] "
echo " -l [full | lite] " echo " -l [full | lite] "
echo " -a [glibc | jemalloc] "
echo " -s [static | dynamic] " echo " -s [static | dynamic] "
echo " -d [taos | power] " echo " -d [taos | power] "
echo " -n [version number] " echo " -n [version number] "
echo " -m [compatible version number] " echo " -m [compatible version number] "
exit 0 exit 0
;; ;;
?) #unknow option ?) #unknow option
echo "unkonw argument" echo "unkonw argument"
exit 1 exit 1
;; ;;
esac esac
done done
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} verNumber=${verNumber} verNumberComp=${verNumberComp}" echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} allocator=${allocator} verNumber=${verNumber} verNumberComp=${verNumberComp}"
curr_dir=$(pwd) curr_dir=$(pwd)
...@@ -118,7 +124,7 @@ function vercomp () { ...@@ -118,7 +124,7 @@ function vercomp () {
echo 0 echo 0
exit 0 exit 0
fi fi
local IFS=. local IFS=.
local i ver1=($1) ver2=($2) local i ver1=($1) ver2=($2)
...@@ -164,7 +170,7 @@ if [[ "$verMode" == "cluster" ]]; then ...@@ -164,7 +170,7 @@ if [[ "$verMode" == "cluster" ]]; then
else else
gitinfoOfInternal=NULL gitinfoOfInternal=NULL
fi fi
cd ${curr_dir} cd ${curr_dir}
# 2. cmake executable file # 2. cmake executable file
...@@ -180,12 +186,18 @@ else ...@@ -180,12 +186,18 @@ else
fi fi
cd ${compile_dir} cd ${compile_dir}
if [[ "$allocator" == "jemalloc" ]]; then
allocator_macro="-DJEMALLOC_ENABLED=true"
else
allocator_macro=""
fi
# check support cpu type # check support cpu type
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]] ; then if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]] ; then
if [ "$verMode" != "cluster" ]; then if [ "$verMode" != "cluster" ]; then
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} ${allocator_macro}
else else
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} ${allocator_macro}
fi fi
else else
echo "input cpuType=${cpuType} error!!!" echo "input cpuType=${cpuType} error!!!"
...@@ -199,9 +211,9 @@ cd ${curr_dir} ...@@ -199,9 +211,9 @@ cd ${curr_dir}
# 3. Call the corresponding script for packaging # 3. Call the corresponding script for packaging
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
if [[ "$verMode" != "cluster" ]] && [[ "$cpuType" == "x64" ]] && [[ "$dbName" == "taos" ]]; then if [[ "$verMode" != "cluster" ]] && [[ "$cpuType" == "x64" ]] && [[ "$dbName" == "taos" ]]; then
ret='0' ret='0'
command -v dpkg >/dev/null 2>&1 || { ret='1'; } command -v dpkg >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then if [ "$ret" -eq 0 ]; then
echo "====do deb package for the ubuntu system====" echo "====do deb package for the ubuntu system===="
output_dir="${top_dir}/debs" output_dir="${top_dir}/debs"
if [ -d ${output_dir} ]; then if [ -d ${output_dir} ]; then
...@@ -214,9 +226,9 @@ if [ "$osType" != "Darwin" ]; then ...@@ -214,9 +226,9 @@ if [ "$osType" != "Darwin" ]; then
echo "==========dpkg command not exist, so not release deb package!!!" echo "==========dpkg command not exist, so not release deb package!!!"
fi fi
ret='0' ret='0'
command -v rpmbuild >/dev/null 2>&1 || { ret='1'; } command -v rpmbuild >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then if [ "$ret" -eq 0 ]; then
echo "====do rpm package for the centos system====" echo "====do rpm package for the centos system===="
output_dir="${top_dir}/rpms" output_dir="${top_dir}/rpms"
if [ -d ${output_dir} ]; then if [ -d ${output_dir} ]; then
...@@ -229,11 +241,11 @@ if [ "$osType" != "Darwin" ]; then ...@@ -229,11 +241,11 @@ if [ "$osType" != "Darwin" ]; then
echo "==========rpmbuild command not exist, so not release rpm package!!!" echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi fi
fi fi
echo "====do tar.gz package for all systems====" echo "====do tar.gz package for all systems===="
cd ${script_dir}/tools cd ${script_dir}/tools
if [[ "$dbName" == "taos" ]]; then if [[ "$dbName" == "taos" ]]; then
${csudo} ./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${csudo} ./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
${csudo} ./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${csudo} ./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
${csudo} ./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${csudo} ./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
......
#!/bin/bash #!/bin/bash
# #
# This file is used to install TAOS time-series database on linux systems. The operating system # This file is used to install TAOS time-series database on linux systems. The operating system
# is required to use systemd to manage services at boot # is required to use systemd to manage services at boot
set -e set -e
# set -x # set -x
# -----------------------Variables definition--------------------- # -----------------------Variables definition
source_dir=$1 source_dir=$1
binary_dir=$2 binary_dir=$2
osType=$3 osType=$3
...@@ -71,9 +71,9 @@ if [ "$osType" != "Darwin" ]; then ...@@ -71,9 +71,9 @@ if [ "$osType" != "Darwin" ]; then
service_mod=0 service_mod=0
elif $(which service &> /dev/null); then elif $(which service &> /dev/null); then
service_mod=1 service_mod=1
service_config_dir="/etc/init.d" service_config_dir="/etc/init.d"
if $(which chkconfig &> /dev/null); then if $(which chkconfig &> /dev/null); then
initd_mod=1 initd_mod=1
elif $(which insserv &> /dev/null); then elif $(which insserv &> /dev/null); then
initd_mod=2 initd_mod=2
elif $(which update-rc.d &> /dev/null); then elif $(which update-rc.d &> /dev/null); then
...@@ -123,9 +123,9 @@ function kill_taosd() { ...@@ -123,9 +123,9 @@ function kill_taosd() {
function install_main_path() { function install_main_path() {
#create install main dir and all sub dir #create install main dir and all sub dir
${csudo} rm -rf ${install_main_dir} || : ${csudo} rm -rf ${install_main_dir} || :
${csudo} mkdir -p ${install_main_dir} ${csudo} mkdir -p ${install_main_dir}
${csudo} mkdir -p ${install_main_dir}/cfg ${csudo} mkdir -p ${install_main_dir}/cfg
${csudo} mkdir -p ${install_main_dir}/bin ${csudo} mkdir -p ${install_main_dir}/bin
${csudo} mkdir -p ${install_main_dir}/connector ${csudo} mkdir -p ${install_main_dir}/connector
${csudo} mkdir -p ${install_main_dir}/driver ${csudo} mkdir -p ${install_main_dir}/driver
${csudo} mkdir -p ${install_main_dir}/examples ${csudo} mkdir -p ${install_main_dir}/examples
...@@ -176,6 +176,49 @@ function install_bin() { ...@@ -176,6 +176,49 @@ function install_bin() {
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || : [ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || :
fi fi
} }
function install_jemalloc() {
if [ "$osType" != "Darwin" ]; then
/usr/bin/install -c -d /usr/local/bin
if [ -f ${binary_dir}/build/bin/jemalloc-config ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jemalloc-config /usr/local/bin
fi
if [ -f ${binary_dir}/build/bin/jemalloc.sh ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jemalloc.sh /usr/local/bin
fi
if [ -f ${binary_dir}/build/bin/jeprof ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jeprof /usr/local/bin
fi
if [ -f ${binary_dir}/build/include/jemalloc/jemalloc.h ]; then
/usr/bin/install -c -d /usr/local/include/jemalloc
/usr/bin/install -c -m 644 ${binary_dir}/build/include/jemalloc/jemalloc.h /usr/local/include/jemalloc
fi
if [ -f ${binary_dir}/build/lib/libjemalloc.so.2 ]; then
/usr/bin/install -c -d /usr/local/lib
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.so.2 /usr/local/lib
ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
/usr/bin/install -c -d /usr/local/lib
if [ -f ${binary_dir}/build/lib/libjemalloc.a ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.a /usr/local/lib
fi
if [ -f ${binary_dir}/build/lib/libjemalloc_pic.a ]; then
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc_pic.a /usr/local/lib
fi
if [ -f ${binary_dir}/build/lib/libjemalloc_pic.a ]; then
/usr/bin/install -c -d /usr/local/lib/pkgconfig
/usr/bin/install -c -m 644 ${binary_dir}/build/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig
fi
fi
if [ -f ${binary_dir}/build/share/doc/jemalloc/jemalloc.html ]; then
/usr/bin/install -c -d /usr/local/share/doc/jemalloc
/usr/bin/install -c -m 644 ${binary_dir}/build/share/doc/jemalloc/jemalloc.html /usr/local/share/doc/jemalloc
fi
if [ -f ${binary_dir}/build/share/man/man3/jemalloc.3 ]; then
/usr/bin/install -c -d /usr/local/share/man/man3
/usr/bin/install -c -m 644 ${binary_dir}/build/share/man/man3/jemalloc.3 /usr/local/share/man/man3
fi
fi
}
function install_lib() { function install_lib() {
# Remove links # Remove links
...@@ -183,12 +226,12 @@ function install_lib() { ...@@ -183,12 +226,12 @@ function install_lib() {
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${lib64_link_dir}/libtaos.* || : ${csudo} rm -f ${lib64_link_dir}/libtaos.* || :
fi fi
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
${csudo} cp ${binary_dir}/build/lib/libtaos.so.${verNumber} ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/* ${csudo} cp ${binary_dir}/build/lib/libtaos.so.${verNumber} ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1 ${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo} ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so ${csudo} ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
if [ -d "${lib64_link_dir}" ]; then if [ -d "${lib64_link_dir}" ]; then
${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 ${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1
${csudo} ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so ${csudo} ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
...@@ -198,7 +241,9 @@ function install_lib() { ...@@ -198,7 +241,9 @@ function install_lib() {
${csudo} ln -sf ${install_main_dir}/driver/libtaos.1.dylib ${lib_link_dir}/libtaos.1.dylib ${csudo} ln -sf ${install_main_dir}/driver/libtaos.1.dylib ${lib_link_dir}/libtaos.1.dylib
${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib ${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
fi fi
install_jemalloc
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
${csudo} ldconfig ${csudo} ldconfig
fi fi
...@@ -206,26 +251,26 @@ function install_lib() { ...@@ -206,26 +251,26 @@ function install_lib() {
function install_header() { function install_header() {
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || : ${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/* ${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/*
${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h ${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h ${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
} }
function install_config() { function install_config() {
#${csudo} rm -f ${install_main_dir}/cfg/taos.cfg || : #${csudo} rm -f ${install_main_dir}/cfg/taos.cfg || :
if [ ! -f ${cfg_install_dir}/taos.cfg ]; then if [ ! -f ${cfg_install_dir}/taos.cfg ]; then
${csudo} mkdir -p ${cfg_install_dir} ${csudo} mkdir -p ${cfg_install_dir}
[ -f ${script_dir}/../cfg/taos.cfg ] && ${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir} [ -f ${script_dir}/../cfg/taos.cfg ] && ${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir}
${csudo} chmod 644 ${cfg_install_dir}/* ${csudo} chmod 644 ${cfg_install_dir}/*
fi fi
${csudo} cp -f ${script_dir}/../cfg/taos.cfg ${install_main_dir}/cfg/taos.cfg.org ${csudo} cp -f ${script_dir}/../cfg/taos.cfg ${install_main_dir}/cfg/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg ${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
} }
function install_log() { function install_log() {
${csudo} rm -rf ${log_dir} || : ${csudo} rm -rf ${log_dir} || :
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
...@@ -239,7 +284,7 @@ function install_log() { ...@@ -239,7 +284,7 @@ function install_log() {
function install_data() { function install_data() {
${csudo} mkdir -p ${data_dir} ${csudo} mkdir -p ${data_dir}
${csudo} ln -s ${data_dir} ${install_main_dir}/data ${csudo} ln -s ${data_dir} ${install_main_dir}/data
} }
function install_connector() { function install_connector() {
...@@ -254,8 +299,8 @@ function install_connector() { ...@@ -254,8 +299,8 @@ function install_connector() {
echo "WARNING: go connector not found, please check if want to use it!" echo "WARNING: go connector not found, please check if want to use it!"
fi fi
${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector ${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null ${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
} }
function install_examples() { function install_examples() {
...@@ -264,8 +309,8 @@ function install_examples() { ...@@ -264,8 +309,8 @@ function install_examples() {
function clean_service_on_sysvinit() { function clean_service_on_sysvinit() {
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start" #restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
#${csudo} sed -i "\|${restart_config_str}|d" /etc/inittab || : #${csudo} sed -i "\|${restart_config_str}|d" /etc/inittab || :
if pidof taosd &> /dev/null; then if pidof taosd &> /dev/null; then
${csudo} service taosd stop || : ${csudo} service taosd stop || :
fi fi
...@@ -277,9 +322,9 @@ function clean_service_on_sysvinit() { ...@@ -277,9 +322,9 @@ function clean_service_on_sysvinit() {
elif ((${initd_mod}==3)); then elif ((${initd_mod}==3)); then
${csudo} update-rc.d -f taosd remove || : ${csudo} update-rc.d -f taosd remove || :
fi fi
${csudo} rm -f ${service_config_dir}/taosd || : ${csudo} rm -f ${service_config_dir}/taosd || :
if $(which init &> /dev/null); then if $(which init &> /dev/null); then
${csudo} init q || : ${csudo} init q || :
fi fi
...@@ -298,10 +343,10 @@ function install_service_on_sysvinit() { ...@@ -298,10 +343,10 @@ function install_service_on_sysvinit() {
${csudo} cp -f ${script_dir}/../rpm/taosd ${install_main_dir}/init.d ${csudo} cp -f ${script_dir}/../rpm/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../rpm/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd ${csudo} cp ${script_dir}/../rpm/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
fi fi
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start" #restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
#${csudo} grep -q -F "$restart_config_str" /etc/inittab || ${csudo} bash -c "echo '${restart_config_str}' >> /etc/inittab" #${csudo} grep -q -F "$restart_config_str" /etc/inittab || ${csudo} bash -c "echo '${restart_config_str}' >> /etc/inittab"
if ((${initd_mod}==1)); then if ((${initd_mod}==1)); then
${csudo} chkconfig --add taosd || : ${csudo} chkconfig --add taosd || :
${csudo} chkconfig --level 2345 taosd on || : ${csudo} chkconfig --level 2345 taosd on || :
...@@ -323,7 +368,7 @@ function clean_service_on_systemd() { ...@@ -323,7 +368,7 @@ function clean_service_on_systemd() {
${csudo} systemctl disable taosd &> /dev/null || echo &> /dev/null ${csudo} systemctl disable taosd &> /dev/null || echo &> /dev/null
${csudo} rm -f ${taosd_service_config} ${csudo} rm -f ${taosd_service_config}
} }
# taos:2345:respawn:/etc/init.d/taosd start # taos:2345:respawn:/etc/init.d/taosd start
...@@ -383,7 +428,7 @@ function update_TDengine() { ...@@ -383,7 +428,7 @@ function update_TDengine() {
sleep 1 sleep 1
fi fi
fi fi
install_main_path install_main_path
install_log install_log
...@@ -431,16 +476,16 @@ function install_TDengine() { ...@@ -431,16 +476,16 @@ function install_TDengine() {
# Start to install # Start to install
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
echo -e "${GREEN}Start to install TDEngine...${NC}" echo -e "${GREEN}Start to install TDEngine...${NC}"
else else
echo -e "${GREEN}Start to install TDEngine Client ...${NC}" echo -e "${GREEN}Start to install TDEngine Client ...${NC}"
fi fi
install_main_path install_main_path
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
install_data install_data
fi fi
install_log install_log
install_header install_header
install_lib install_lib
install_connector install_connector
...@@ -452,7 +497,7 @@ function install_TDengine() { ...@@ -452,7 +497,7 @@ function install_TDengine() {
install_service install_service
fi fi
install_config install_config
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
# Ask if to start the service # Ask if to start the service
......
...@@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i ...@@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/ */
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo); bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo); bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo); bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
......
...@@ -1286,7 +1286,7 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { ...@@ -1286,7 +1286,7 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
} }
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false); sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_VALUES) { if (sToken.n <= 0 || (sToken.type != TK_VALUES && sToken.type != TK_LP)) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
......
...@@ -90,6 +90,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm ...@@ -90,6 +90,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm
static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken); static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding); static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t validateStateWindowNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem); static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
...@@ -858,6 +859,59 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS ...@@ -858,6 +859,59 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
// The following part is used to check for the invalid query expression. // The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo); return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
} }
static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) {
const char* msg1 = "invalid column name";
const char* msg3 = "not support state_window with group by ";
const char* msg4 = "function not support for super table query";
SStrToken *col = &(pSqlNode->windowstateVal.col) ;
if (col->z == NULL || col->n <= 0) {
return TSDB_CODE_SUCCESS;
}
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
pQueryInfo->groupbyExpr.numOfGroupCols = 1;
//TODO(dengyihao): check tag column
if (isStable) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || index.columnIndex >= numOfCols) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
SGroupbyExpr* pGroupExpr = &pQueryInfo->groupbyExpr;
if (pGroupExpr->columnInfo == NULL) {
pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex));
}
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema);
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
pQueryInfo->stateWindow = true;
return TSDB_CODE_SUCCESS;
}
int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) { int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) {
const char* msg1 = "gap should be fixed time window"; const char* msg1 = "gap should be fixed time window";
...@@ -892,11 +946,17 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS ...@@ -892,11 +946,17 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS
if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) { if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
if (pQueryInfo->sessionWindow.gap == 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER; SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
...@@ -2903,6 +2963,9 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) ...@@ -2903,6 +2963,9 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
return true; return true;
} }
} }
} else if (tscIsSessionWindowQuery(pQueryInfo)) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return true;
} }
return false; return false;
...@@ -6163,7 +6226,7 @@ static int32_t doTagFunctionCheck(SQueryInfo* pQueryInfo) { ...@@ -6163,7 +6226,7 @@ static int32_t doTagFunctionCheck(SQueryInfo* pQueryInfo) {
int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "functions/columns not allowed in group by query"; const char* msg1 = "functions/columns not allowed in group by query";
const char* msg2 = "projection query on columns not allowed"; const char* msg2 = "projection query on columns not allowed";
const char* msg3 = "group by not allowed on projection query"; const char* msg3 = "group by/session/state_window not allowed on projection query";
const char* msg4 = "retrieve tags not compatible with group by or interval query"; const char* msg4 = "retrieve tags not compatible with group by or interval query";
const char* msg5 = "functions can not be mixed up"; const char* msg5 = "functions can not be mixed up";
...@@ -6179,6 +6242,9 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { ...@@ -6179,6 +6242,9 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
if (tscIsProjectionQuery(pQueryInfo) && tscIsSessionWindowQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
// check if all the tags prj columns belongs to the group by columns // check if all the tags prj columns belongs to the group by columns
...@@ -6748,6 +6814,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -6748,6 +6814,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
...@@ -7536,7 +7603,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7536,7 +7603,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
// parse the window_state
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// set order by info // set order by info
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
...@@ -7577,6 +7647,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7577,6 +7647,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
* transfer sql functions that need secondary merge into another format * transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last * in dealing with super table queries such as: count/first/last
*/ */
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (isSTable) { if (isSTable) {
tscTansformFuncForSTableQuery(pQueryInfo); tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) { if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
...@@ -7584,10 +7658,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7584,10 +7658,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
} }
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// no result due to invalid query time range // no result due to invalid query time range
if (pQueryInfo->window.skey > pQueryInfo->window.ekey) { if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
......
...@@ -857,6 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -857,6 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->simpleAgg = query.simpleAgg; pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery; pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan; pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->stateWindow = query.stateWindow;
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->sqlstrLen = htonl(sqlLen);
......
...@@ -434,6 +434,9 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { ...@@ -434,6 +434,9 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
return false; return false;
} }
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->sessionWindow.gap > 0;
}
bool tscNeedReverseScan(SQueryInfo* pQueryInfo) { bool tscNeedReverseScan(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo); size_t numOfExprs = tscNumOfExprs(pQueryInfo);
...@@ -4202,11 +4205,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4202,11 +4205,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo); pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo); pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo); pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->distinctTag = pQueryInfo->distinctTag; pQueryAttr->distinctTag = pQueryInfo->distinctTag;
pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput; pQueryAttr->numOfOutput = numOfOutput;
...@@ -4214,8 +4219,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4214,8 +4219,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->slimit = pQueryInfo->slimit; pQueryAttr->slimit = pQueryInfo->slimit;
pQueryAttr->order = pQueryInfo->order; pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->havingNum = pQueryInfo->havingFieldNum; pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window; pQueryAttr->window = pQueryInfo->window;
......
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
...@@ -10,8 +10,15 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) ...@@ -10,8 +10,15 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
ELSE ()
SET(LINK_JEMALLOC "")
ENDIF ()
ADD_EXECUTABLE(taosd ${SRC}) ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync) TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync ${LINK_JEMALLOC})
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosd taos_static) TARGET_LINK_LIBRARIES(taosd taos_static)
......
...@@ -473,6 +473,7 @@ typedef struct { ...@@ -473,6 +473,7 @@ typedef struct {
bool simpleAgg; bool simpleAgg;
bool pointInterpQuery; // point interpolation query bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan bool needReverseScan; // need reverse scan
bool stateWindow; // state window flag
STimeWindow window; STimeWindow window;
int32_t numOfTables; int32_t numOfTables;
......
...@@ -136,74 +136,74 @@ ...@@ -136,74 +136,74 @@
#define TK_VARIABLE 117 #define TK_VARIABLE 117
#define TK_INTERVAL 118 #define TK_INTERVAL 118
#define TK_SESSION 119 #define TK_SESSION 119
#define TK_FILL 120 #define TK_STATE_WINDOW 120
#define TK_SLIDING 121 #define TK_FILL 121
#define TK_ORDER 122 #define TK_SLIDING 122
#define TK_BY 123 #define TK_ORDER 123
#define TK_ASC 124 #define TK_BY 124
#define TK_DESC 125 #define TK_ASC 125
#define TK_GROUP 126 #define TK_DESC 126
#define TK_HAVING 127 #define TK_GROUP 127
#define TK_LIMIT 128 #define TK_HAVING 128
#define TK_OFFSET 129 #define TK_LIMIT 129
#define TK_SLIMIT 130 #define TK_OFFSET 130
#define TK_SOFFSET 131 #define TK_SLIMIT 131
#define TK_WHERE 132 #define TK_SOFFSET 132
#define TK_NOW 133 #define TK_WHERE 133
#define TK_RESET 134 #define TK_NOW 134
#define TK_QUERY 135 #define TK_RESET 135
#define TK_SYNCDB 136 #define TK_QUERY 136
#define TK_ADD 137 #define TK_SYNCDB 137
#define TK_COLUMN 138 #define TK_ADD 138
#define TK_TAG 139 #define TK_COLUMN 139
#define TK_CHANGE 140 #define TK_TAG 140
#define TK_SET 141 #define TK_CHANGE 141
#define TK_KILL 142 #define TK_SET 142
#define TK_CONNECTION 143 #define TK_KILL 143
#define TK_STREAM 144 #define TK_CONNECTION 144
#define TK_COLON 145 #define TK_STREAM 145
#define TK_ABORT 146 #define TK_COLON 146
#define TK_AFTER 147 #define TK_ABORT 147
#define TK_ATTACH 148 #define TK_AFTER 148
#define TK_BEFORE 149 #define TK_ATTACH 149
#define TK_BEGIN 150 #define TK_BEFORE 150
#define TK_CASCADE 151 #define TK_BEGIN 151
#define TK_CLUSTER 152 #define TK_CASCADE 152
#define TK_CONFLICT 153 #define TK_CLUSTER 153
#define TK_COPY 154 #define TK_CONFLICT 154
#define TK_DEFERRED 155 #define TK_COPY 155
#define TK_DELIMITERS 156 #define TK_DEFERRED 156
#define TK_DETACH 157 #define TK_DELIMITERS 157
#define TK_EACH 158 #define TK_DETACH 158
#define TK_END 159 #define TK_EACH 159
#define TK_EXPLAIN 160 #define TK_END 160
#define TK_FAIL 161 #define TK_EXPLAIN 161
#define TK_FOR 162 #define TK_FAIL 162
#define TK_IGNORE 163 #define TK_FOR 163
#define TK_IMMEDIATE 164 #define TK_IGNORE 164
#define TK_INITIALLY 165 #define TK_IMMEDIATE 165
#define TK_INSTEAD 166 #define TK_INITIALLY 166
#define TK_MATCH 167 #define TK_INSTEAD 167
#define TK_KEY 168 #define TK_MATCH 168
#define TK_OF 169 #define TK_KEY 169
#define TK_RAISE 170 #define TK_OF 170
#define TK_REPLACE 171 #define TK_RAISE 171
#define TK_RESTRICT 172 #define TK_REPLACE 172
#define TK_ROW 173 #define TK_RESTRICT 173
#define TK_STATEMENT 174 #define TK_ROW 174
#define TK_TRIGGER 175 #define TK_STATEMENT 175
#define TK_VIEW 176 #define TK_TRIGGER 176
#define TK_SEMI 177 #define TK_VIEW 177
#define TK_NONE 178 #define TK_SEMI 178
#define TK_PREV 179 #define TK_NONE 179
#define TK_LINEAR 180 #define TK_PREV 180
#define TK_IMPORT 181 #define TK_LINEAR 181
#define TK_TBNAME 182 #define TK_IMPORT 182
#define TK_JOIN 183 #define TK_TBNAME 183
#define TK_INSERT 184 #define TK_JOIN 184
#define TK_INTO 185 #define TK_INSERT 185
#define TK_VALUES 186 #define TK_INTO 186
#define TK_VALUES 187
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
......
...@@ -11,10 +11,17 @@ IF (TD_LINUX) ...@@ -11,10 +11,17 @@ IF (TD_LINUX)
LIST(REMOVE_ITEM SRC ./src/shellDarwin.c) LIST(REMOVE_ITEM SRC ./src/shellDarwin.c)
ADD_EXECUTABLE(shell ${SRC}) ADD_EXECUTABLE(shell ${SRC})
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
ELSE ()
SET(LINK_JEMALLOC "")
ENDIF ()
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(shell taos_static) TARGET_LINK_LIBRARIES(shell taos_static ${LINK_JEMALLOC})
ELSE () ELSE ()
TARGET_LINK_LIBRARIES(shell taos) TARGET_LINK_LIBRARIES(shell taos ${LINK_JEMALLOC})
ENDIF () ENDIF ()
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos) SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
......
...@@ -55,14 +55,21 @@ ENDIF () ...@@ -55,14 +55,21 @@ ENDIF ()
MESSAGE("TD_VERSION_NUMBER is:" ${TD_VERSION_NUMBER}) MESSAGE("TD_VERSION_NUMBER is:" ${TD_VERSION_NUMBER})
ADD_DEFINITIONS(-DTD_VERNUMBER="${TD_VERSION_NUMBER}") ADD_DEFINITIONS(-DTD_VERNUMBER="${TD_VERSION_NUMBER}")
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
ELSE ()
SET(LINK_JEMALLOC "")
ENDIF ()
IF (TD_LINUX) IF (TD_LINUX)
AUX_SOURCE_DIRECTORY(. SRC) AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taosdemo ${SRC}) ADD_EXECUTABLE(taosdemo ${SRC})
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson) TARGET_LINK_LIBRARIES(taosdemo taos_static cJson ${LINK_JEMALLOC})
ELSE () ELSE ()
TARGET_LINK_LIBRARIES(taosdemo taos cJson) TARGET_LINK_LIBRARIES(taosdemo taos cJson ${LINK_JEMALLOC})
ENDIF () ENDIF ()
ELSEIF (TD_WINDOWS) ELSEIF (TD_WINDOWS)
AUX_SOURCE_DIRECTORY(. SRC) AUX_SOURCE_DIRECTORY(. SRC)
...@@ -71,7 +78,7 @@ ELSEIF (TD_WINDOWS) ...@@ -71,7 +78,7 @@ ELSEIF (TD_WINDOWS)
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson) TARGET_LINK_LIBRARIES(taosdemo taos_static cJson)
ELSE () ELSE ()
TARGET_LINK_LIBRARIES(taosdemo taos cJson}) TARGET_LINK_LIBRARIES(taosdemo taos cJson)
ENDIF () ENDIF ()
ELSEIF (TD_DARWIN) ELSEIF (TD_DARWIN)
# missing a few dependencies, such as <argp.h> # missing a few dependencies, such as <argp.h>
......
...@@ -22,6 +22,10 @@ ...@@ -22,6 +22,10 @@
extern "C" { extern "C" {
#endif #endif
#ifdef TD_JEMALLOC_ENABLED
#include <jemalloc/jemalloc.h>
#endif
typedef enum { typedef enum {
TAOS_ALLOC_MODE_DEFAULT = 0, TAOS_ALLOC_MODE_DEFAULT = 0,
TAOS_ALLOC_MODE_RANDOM_FAIL = 1, TAOS_ALLOC_MODE_RANDOM_FAIL = 1,
......
...@@ -189,6 +189,7 @@ typedef struct SQueryAttr { ...@@ -189,6 +189,7 @@ typedef struct SQueryAttr {
bool pointInterpQuery; // point interpolation query bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan bool needReverseScan; // need reverse scan
bool distinctTag; // distinct tag query bool distinctTag; // distinct tag query
bool stateWindow; // window State on sub/normal table
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
...@@ -296,6 +297,7 @@ enum OPERATOR_TYPE_E { ...@@ -296,6 +297,7 @@ enum OPERATOR_TYPE_E {
OP_Filter = 19, OP_Filter = 19,
OP_Distinct = 20, OP_Distinct = 20,
OP_Join = 21, OP_Join = 21,
OP_StateWindow = 22,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
...@@ -460,6 +462,16 @@ typedef struct SSWindowOperatorInfo { ...@@ -460,6 +462,16 @@ typedef struct SSWindowOperatorInfo {
int32_t start; // start row index int32_t start; // start row index
} SSWindowOperatorInfo; } SSWindowOperatorInfo;
typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window
int32_t numOfRows; // number of rows
int32_t colIndex; // start row index
int32_t start;
char* prevData; // previous data
} SStateWindowOperatorInfo ;
typedef struct SDistinctOperatorInfo { typedef struct SDistinctOperatorInfo {
SHashObj *pSet; SHashObj *pSet;
SSDataBlock *pRes; SSDataBlock *pRes;
...@@ -509,6 +521,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat ...@@ -509,6 +521,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger, bool groupMix); int32_t numOfRows, void* merger, bool groupMix);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
......
...@@ -89,6 +89,10 @@ typedef struct SSessionWindowVal { ...@@ -89,6 +89,10 @@ typedef struct SSessionWindowVal {
SStrToken gap; SStrToken gap;
} SSessionWindowVal; } SSessionWindowVal;
typedef struct SWindowStateVal {
SStrToken col;
} SWindowStateVal;
struct SRelationInfo; struct SRelationInfo;
typedef struct SSqlNode { typedef struct SSqlNode {
...@@ -100,6 +104,7 @@ typedef struct SSqlNode { ...@@ -100,6 +104,7 @@ typedef struct SSqlNode {
SArray *fillType; // fill type[optional], SArray<tVariantListItem> SArray *fillType; // fill type[optional], SArray<tVariantListItem>
SIntervalVal interval; // (interval, interval_offset) [optional] SIntervalVal interval; // (interval, interval_offset) [optional]
SSessionWindowVal sessionVal; // session window [optional] SSessionWindowVal sessionVal; // session window [optional]
SWindowStateVal windowstateVal; // window_state(col) [optional]
SStrToken sliding; // sliding window [optional] SStrToken sliding; // sliding window [optional]
SLimitVal limit; // limit offset [optional] SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional] SLimitVal slimit; // group limit offset [optional]
...@@ -275,7 +280,7 @@ SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinc ...@@ -275,7 +280,7 @@ SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinc
void tSqlExprListDestroy(SArray *pList); void tSqlExprListDestroy(SArray *pList);
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere, SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SWindowStateVal *pw,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving); SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving);
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right); int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right);
......
...@@ -138,6 +138,7 @@ typedef struct SQueryInfo { ...@@ -138,6 +138,7 @@ typedef struct SQueryInfo {
bool hasFilter; bool hasFilter;
bool onlyTagQuery; bool onlyTagQuery;
bool orderProjectQuery; bool orderProjectQuery;
bool stateWindow;
} SQueryInfo; } SQueryInfo;
/** /**
......
...@@ -456,8 +456,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). { ...@@ -456,8 +456,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
//////////////////////// The SELECT statement ///////////////////////////////// //////////////////////// The SELECT statement /////////////////////////////////
%type select {SSqlNode*} %type select {SSqlNode*}
%destructor select {destroySqlNode($$);} %destructor select {destroySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N); A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N);
} }
select(A) ::= LP select(B) RP. {A = B;} select(A) ::= LP select(B) RP. {A = B;}
...@@ -475,7 +475,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); } ...@@ -475,7 +475,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// select client_version() // select client_version()
// select server_state() // select server_state()
select(A) ::= SELECT(T) selcollist(W). { select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
} }
// selcollist is a list of expressions that are to become the return // selcollist is a list of expressions that are to become the return
...@@ -558,6 +558,11 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. { ...@@ -558,6 +558,11 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
X.col = V; X.col = V;
X.gap = Y; X.gap = Y;
} }
%type windowstate_option {SWindowStateVal}
windowstate_option(X) ::= . {X.col.n = 0;}
windowstate_option(X) ::= STATE_WINDOW LP ids(V) RP. {
X.col = V;
}
%type fill_opt {SArray*} %type fill_opt {SArray*}
%destructor fill_opt {taosArrayDestroy($$);} %destructor fill_opt {taosArrayDestroy($$);}
......
...@@ -3299,8 +3299,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) { ...@@ -3299,8 +3299,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
if (pCtx->numOfParams == 2) { if (pCtx->numOfParams == 2) {
return; return;
} }
if (pCtx->param[0].i64 == 1) {
SET_VAL(pCtx, pCtx->size, 1);
} else {
INC_INIT_VAL(pCtx, pCtx->size);
}
INC_INIT_VAL(pCtx, pCtx->size);
char *pData = GET_INPUT_DATA_LIST(pCtx); char *pData = GET_INPUT_DATA_LIST(pCtx);
if (pCtx->order == TSDB_ORDER_ASC) { if (pCtx->order == TSDB_ORDER_ASC) {
......
...@@ -189,12 +189,16 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); ...@@ -189,12 +189,16 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput); static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput); static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
...@@ -731,7 +735,6 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx ...@@ -731,7 +735,6 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) { if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false; pCtx[k].preAggVals.isSet = false;
} }
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
} }
...@@ -1299,7 +1302,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1299,7 +1302,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
} }
int32_t ret = int32_t ret =
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex); setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -1338,12 +1341,16 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1338,12 +1341,16 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
pInfo->start = j; pInfo->start = j;
} else if (tsList[j] - pInfo->prevTs <= gap) { } else if (tsList[j] - pInfo->prevTs <= gap) {
pInfo->curWindow.ekey = tsList[j]; pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j]; //pInfo->prevTs = tsList[j];
pInfo->numOfRows += 1; pInfo->numOfRows += 1;
pInfo->start = j; if (j == 0 && pInfo->start != 0) {
pInfo->numOfRows = 1;
pInfo->start = 0;
}
} else { // start a new session window } else { // start a new session window
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
...@@ -1364,6 +1371,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1364,6 +1371,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
...@@ -1391,12 +1399,12 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { ...@@ -1391,12 +1399,12 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
} }
} }
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t *rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &pInfo->binfo.resultRowInfo; SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo;
SQLFunctionCtx *pCtx = pInfo->binfo.pCtx; SQLFunctionCtx *pCtx = binfo->pCtx;
// not assign result buffer yet, add new result buffer, TODO remove it // not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData; char* d = pData;
...@@ -1767,6 +1775,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1767,6 +1775,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
break; break;
} }
case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break;
}
case OP_Limit: { case OP_Limit: {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
...@@ -2109,6 +2122,8 @@ static bool onlyFirstQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQu ...@@ -2109,6 +2122,8 @@ static bool onlyFirstQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQu
static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); } static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
static bool notContainSessionOrStateWindow(SQueryAttr *pQueryAttr) { return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); }
static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) { static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) {
bool hasFirstLastFunc = false; bool hasFirstLastFunc = false;
bool hasOtherFunc = false; bool hasOtherFunc = false;
...@@ -2212,7 +2227,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2212,7 +2227,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
} }
pQueryAttr->order.order = TSDB_ORDER_ASC; pQueryAttr->order.order = TSDB_ORDER_ASC;
} else if (onlyLastQuery(pQueryAttr)) { } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey, qDebug(msg, pQInfo, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
...@@ -3204,7 +3219,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult ...@@ -3204,7 +3219,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfOutput = pOperator->numOfOutput; int32_t numOfOutput = pOperator->numOfOutput;
if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0) { if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
if (pQueryAttr->groupbyColumn) { if (pQueryAttr->groupbyColumn) {
closeAllResultRows(pResultRowInfo); closeAllResultRows(pResultRowInfo);
...@@ -4514,6 +4529,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -4514,6 +4529,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else if (pDownstream->operatorType == OP_SessionWindow) { } else if (pDownstream->operatorType == OP_SessionWindow) {
SSWindowOperatorInfo* pInfo = pDownstream->info; SSWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_StateWindow) {
SStateWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx; pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
...@@ -4625,7 +4646,6 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4625,7 +4646,6 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
tfree(pInfo->currentGroupColData); tfree(pInfo->currentGroupColData);
} }
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList); taosArrayDestroy(pInfo->orderColumnList);
...@@ -5131,6 +5151,130 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -5131,6 +5151,130 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
} }
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
SOptrBasicInfo* pBInfo = &pInfo->binfo;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
pInfo->numOfRows = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j;
if (isNull(val, type)) {
continue;
}
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->start = j;
} else if (memcmp(pInfo->prevData, val, bytes) == 0) {
pInfo->curWindow.ekey = tsList[j];
pInfo->numOfRows += 1;
//pInfo->start = j;
if (j == 0 && pInfo->start != 0) {
pInfo->numOfRows = 1;
pInfo->start = 0;
}
} else {
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->start = j;
}
}
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
}
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
while (1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
if (pBlock == NULL) {
break;
}
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order);
if (pWindowInfo->colIndex == -1) {
pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
}
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
}
// restore the value
pQueryAttr->order.order = order;
pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -5140,6 +5284,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -5140,6 +5284,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SSWindowOperatorInfo* pWindowInfo = pOperator->info; SSWindowOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
...@@ -5152,6 +5297,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -5152,6 +5297,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
} }
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
//pQueryAttr->order.order = TSDB_ORDER_ASC;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
...@@ -5389,7 +5535,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -5389,7 +5535,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAggregate; pOperator->exec = doAggregate;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyAggOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
...@@ -5409,6 +5555,19 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5409,6 +5555,19 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param; SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param;
doDestroyBasicInfo(pInfo, numOfOutput); doDestroyBasicInfo(pInfo, numOfOutput);
} }
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData);
}
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
...@@ -5463,7 +5622,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO ...@@ -5463,7 +5622,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSTableAggregate; pOperator->exec = doSTableAggregate;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyAggOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
...@@ -5589,7 +5748,29 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -5589,7 +5748,29 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "StateWindowOperator";
pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doStateWindowAgg;
pOperator->cleanup = destroyStateWindowOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator;
}
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
...@@ -5609,7 +5790,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -5609,7 +5790,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSessionWindowAgg; pOperator->exec = doSessionWindowAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroySWindowOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
...@@ -6901,6 +7082,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -6901,6 +7082,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; pQueryAttr->simpleAgg = pQueryMsg->simpleAgg;
pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery;
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
pQueryAttr->stateWindow = pQueryMsg->stateWindow;
pQueryAttr->vgId = vgId; pQueryAttr->vgId = vgId;
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
......
...@@ -592,6 +592,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -592,6 +592,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
op = OP_SessionWindow; op = OP_SessionWindow;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->stateWindow) {
op = OP_StateWindow;
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic; op = OP_Arithmetic;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
......
...@@ -726,9 +726,9 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { ...@@ -726,9 +726,9 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
* extract the select info out of sql string * extract the select info out of sql string
*/ */
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere, SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *psLimit, SSessionWindowVal *pSession, SWindowStateVal *pWindowStateVal, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
tSqlExpr *pHaving) { SLimitVal *psLimit, tSqlExpr *pHaving) {
assert(pSelNodeList != NULL); assert(pSelNodeList != NULL);
SSqlNode *pSqlNode = calloc(1, sizeof(SSqlNode)); SSqlNode *pSqlNode = calloc(1, sizeof(SSqlNode));
...@@ -779,6 +779,12 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat ...@@ -779,6 +779,12 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat
TPARSER_SET_NONE_TOKEN(pSqlNode->sessionVal.col); TPARSER_SET_NONE_TOKEN(pSqlNode->sessionVal.col);
} }
if (pWindowStateVal != NULL) {
pSqlNode->windowstateVal = *pWindowStateVal;
} else {
TPARSER_SET_NONE_TOKEN(pSqlNode->windowstateVal.col);
}
return pSqlNode; return pSqlNode;
} }
......
此差异已折叠。
...@@ -141,6 +141,7 @@ static SKeyword keywordTable[] = { ...@@ -141,6 +141,7 @@ static SKeyword keywordTable[] = {
{"VARIABLE", TK_VARIABLE}, {"VARIABLE", TK_VARIABLE},
{"INTERVAL", TK_INTERVAL}, {"INTERVAL", TK_INTERVAL},
{"SESSION", TK_SESSION}, {"SESSION", TK_SESSION},
{"STATE_WINDOW", TK_STATE_WINDOW},
{"FILL", TK_FILL}, {"FILL", TK_FILL},
{"SLIDING", TK_SLIDING}, {"SLIDING", TK_SLIDING},
{"ORDER", TK_ORDER}, {"ORDER", TK_ORDER},
......
...@@ -29,8 +29,8 @@ pipeline { ...@@ -29,8 +29,8 @@ pipeline {
agent none agent none
environment{ environment{
WK = '/var/lib/jenkins/workspace/TDinternal' WK = '/data/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community' WKC= '/data/lib/jenkins/workspace/TDinternal/community'
} }
stages { stages {
......
...@@ -6,9 +6,9 @@ events { ...@@ -6,9 +6,9 @@ events {
} }
http { http {
lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;/blah/?.lua;;'; lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;$prefix/rest/?/init.lua;;';
lua_package_cpath "$prefix/so/?.so;;"; lua_package_cpath "$prefix/so/?.so;;";
lua_code_cache off; lua_code_cache on;
server { server {
listen 7000; listen 7000;
server_name restapi; server_name restapi;
......
local config = {
host = "127.0.0.1",
port = 6030,
database = "",
user = "root",
password = "taosdata",
max_packet_size = 1024 * 1024 ,
connection_pool_size = 64
}
return config
local _M = {}
local driver = require "luaconnector51"
local water_mark = 0
local occupied = 0
local connection_pool = {}
function _M.new(o,config)
o = o or {}
o.connection_pool = connection_pool
o.water_mark = water_mark
o.occupied = occupied
if #connection_pool == 0 then
for i = 1, config.connection_pool_size do
local res = driver.connect(config)
if res.code ~= 0 then
ngx.log(ngx.ERR, "connect--- failed:"..res.error)
return nil
else
local object = {obj = res.conn, state = 0}
table.insert(o.connection_pool,i, object)
ngx.log(ngx.INFO, "add connection, now pool size:"..#(o.connection_pool))
end
end
end
return setmetatable(o, { __index = _M })
end
function _M:get_connection()
local connection_obj
for i = 1, #connection_pool do
connection_obj = connection_pool[i]
if connection_obj.state == 0 then
connection_obj.state = 1
occupied = occupied +1
if occupied > water_mark then
water_mark = occupied
end
return connection_obj["obj"]
end
end
ngx.log(ngx.ERR,"ALERT! NO FREE CONNECTION.")
return nil
end
function _M:get_water_mark()
return water_mark
end
function _M:release_connection(conn)
local connection_obj
for i = 1, #connection_pool do
connection_obj = connection_pool[i]
if connection_obj["obj"] == conn then
connection_obj["state"] = 0
occupied = occupied -1
return
end
end
end
return _M
local driver = require "luaconnector51" local driver = require "luaconnector51"
local cjson = require "cjson" local cjson = require "cjson"
local Pool = require "tdpool"
local config = require "config"
ngx.say("start time:"..os.time()) ngx.say("start time:"..os.time())
local pool = Pool.new(Pool,config)
local config = { local conn = pool:get_connection()
host = "127.0.0.1",
port = 6030,
database = "",
user = "root",
password = "taosdata",
max_packet_size = 1024 * 1024
}
local conn
local res = driver.connect(config)
if res.code ~=0 then
ngx.say("connect--- failed: "..res.error)
return
else
conn = res.conn
ngx.say("connect--- pass.")
end
local res = driver.query(conn,"drop database if exists nginx") local res = driver.query(conn,"drop database if exists nginx")
if res.code ~=0 then if res.code ~=0 then
...@@ -51,7 +36,7 @@ else ...@@ -51,7 +36,7 @@ else
ngx.say("create table--- pass.") ngx.say("create table--- pass.")
end end
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')") res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 0, 'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
if res.code ~=0 then if res.code ~=0 then
ngx.say("insert records failed: "..res.error) ngx.say("insert records failed: "..res.error)
return return
...@@ -77,7 +62,29 @@ else ...@@ -77,7 +62,29 @@ else
end end
end end
driver.close(conn)
ngx.say("end time:"..os.time())
--ngx.log(ngx.ERR,"in test file.")
local flag = false
function query_callback(res)
if res.code ~=0 then
ngx.say("async_query_callback--- failed:"..res.error)
else
if(res.affected == 3) then
ngx.say("async_query_callback, insert records--- pass")
else
ngx.say("async_query_callback, insert records---failed: expect 3 affected records, actually affected "..res.affected)
end
end
flag = true
end
driver.query_a(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 3, 'robotspace'),('2019-09-01 00:00:00.006', 4, 'Hilink'),('2019-09-01 00:00:00.007', 6, 'Harmony')", query_callback)
while not flag do
-- ngx.say("i am here once...")
ngx.sleep(0.001) -- time unit is second
end
ngx.say("pool water_mark:"..pool:get_water_mark())
pool:release_connection(conn)
ngx.say("end time:"..os.time())
...@@ -13,6 +13,11 @@ struct cb_param{ ...@@ -13,6 +13,11 @@ struct cb_param{
void * stream; void * stream;
}; };
struct async_query_callback_param{
lua_State* state;
int callback;
};
static int l_connect(lua_State *L){ static int l_connect(lua_State *L){
TAOS * taos=NULL; TAOS * taos=NULL;
const char* host; const char* host;
...@@ -23,7 +28,7 @@ static int l_connect(lua_State *L){ ...@@ -23,7 +28,7 @@ static int l_connect(lua_State *L){
luaL_checktype(L, 1, LUA_TTABLE); luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L,-1,"host"); lua_getfield(L, 1,"host");
if (lua_isstring(L,-1)){ if (lua_isstring(L,-1)){
host = lua_tostring(L, -1); host = lua_tostring(L, -1);
// printf("host = %s\n", host); // printf("host = %s\n", host);
...@@ -178,6 +183,58 @@ static int l_query(lua_State *L){ ...@@ -178,6 +183,58 @@ static int l_query(lua_State *L){
return 1; return 1;
} }
void async_query_callback(void *param, TAOS_RES *result, int code){
struct async_query_callback_param* p = (struct async_query_callback_param*) param;
//printf("\nin c,numfields:%d\n", numFields);
//printf("\nin c, code:%d\n", code);
lua_State *L = p->state;
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
lua_newtable(L);
int table_index = lua_gettop(L);
if( code < 0){
printf("failed, reason:%s\n", taos_errstr(result));
lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L,"something is wrong");// taos_errstr(taos));
lua_setfield(L, table_index, "error");
}else{
//printf("success to async query.\n");
const int affectRows = taos_affected_rows(result);
//printf(" affect rows:%d\r\n", affectRows);
lua_pushinteger(L, 0);
lua_setfield(L, table_index, "code");
lua_pushinteger(L, affectRows);
lua_setfield(L, table_index, "affected");
}
lua_call(L, 1, 0);
}
static int l_async_query(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
// int stime = luaL_checknumber(L,3);
lua_newtable(L);
int table_index = lua_gettop(L);
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
taos_query_a(taos,sqlstr,async_query_callback,p);
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, "ok");
lua_setfield(L, table_index, "error");
return 1;
}
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
struct cb_param* p = (struct cb_param*) param; struct cb_param* p = (struct cb_param*) param;
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
...@@ -308,6 +365,7 @@ static int l_close(lua_State *L){ ...@@ -308,6 +365,7 @@ static int l_close(lua_State *L){
static const struct luaL_Reg lib[] = { static const struct luaL_Reg lib[] = {
{"connect", l_connect}, {"connect", l_connect},
{"query", l_query}, {"query", l_query},
{"query_a",l_async_query},
{"close", l_close}, {"close", l_close},
{"open_stream", l_open_stream}, {"open_stream", l_open_stream},
{"close_stream", l_close_stream}, {"close_stream", l_close_stream},
......
...@@ -13,6 +13,11 @@ struct cb_param{ ...@@ -13,6 +13,11 @@ struct cb_param{
void * stream; void * stream;
}; };
struct async_query_callback_param{
lua_State* state;
int callback;
};
static int l_connect(lua_State *L){ static int l_connect(lua_State *L){
TAOS * taos=NULL; TAOS * taos=NULL;
const char* host; const char* host;
...@@ -56,6 +61,7 @@ static int l_connect(lua_State *L){ ...@@ -56,6 +61,7 @@ static int l_connect(lua_State *L){
lua_settop(L,0); lua_settop(L,0);
taos_init(); taos_init();
lua_newtable(L); lua_newtable(L);
int table_index = lua_gettop(L); int table_index = lua_gettop(L);
...@@ -177,6 +183,58 @@ static int l_query(lua_State *L){ ...@@ -177,6 +183,58 @@ static int l_query(lua_State *L){
return 1; return 1;
} }
void async_query_callback(void *param, TAOS_RES *result, int code){
struct async_query_callback_param* p = (struct async_query_callback_param*) param;
//printf("\nin c,numfields:%d\n", numFields);
//printf("\nin c, code:%d\n", code);
lua_State *L = p->state;
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
lua_newtable(L);
int table_index = lua_gettop(L);
if( code < 0){
printf("failed, reason:%s\n", taos_errstr(result));
lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L,"something is wrong");// taos_errstr(taos));
lua_setfield(L, table_index, "error");
}else{
//printf("success to async query.\n");
const int affectRows = taos_affected_rows(result);
//printf(" affect rows:%d\r\n", affectRows);
lua_pushinteger(L, 0);
lua_setfield(L, table_index, "code");
lua_pushinteger(L, affectRows);
lua_setfield(L, table_index, "affected");
}
lua_call(L, 1, 0);
}
static int l_async_query(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
// int stime = luaL_checknumber(L,3);
lua_newtable(L);
int table_index = lua_gettop(L);
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
taos_query_a(taos,sqlstr,async_query_callback,p);
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, "ok");
lua_setfield(L, table_index, "error");
return 1;
}
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
struct cb_param* p = (struct cb_param*) param; struct cb_param* p = (struct cb_param*) param;
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
...@@ -307,6 +365,7 @@ static int l_close(lua_State *L){ ...@@ -307,6 +365,7 @@ static int l_close(lua_State *L){
static const struct luaL_Reg lib[] = { static const struct luaL_Reg lib[] = {
{"connect", l_connect}, {"connect", l_connect},
{"query", l_query}, {"query", l_query},
{"query_a",l_async_query},
{"close", l_close}, {"close", l_close},
{"open_stream", l_open_stream}, {"open_stream", l_open_stream},
{"close_stream", l_close_stream}, {"close_stream", l_close_stream},
......
...@@ -110,7 +110,25 @@ else ...@@ -110,7 +110,25 @@ else
end end
end end
function callback(t) function async_query_callback(res)
if res.code ~=0 then
print("async_query_callback--- failed:"..res.error)
return
else
if(res.affected == 3) then
print("async_query_callback, insert records--- pass")
else
print("async_query_callback, insert records---failed: expect 3 affected records, actually affected "..res.affected)
end
end
end
driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
function stream_callback(t)
print("------------------------") print("------------------------")
print("continuous query result:") print("continuous query result:")
for key, value in pairs(t) do for key, value in pairs(t) do
...@@ -119,7 +137,7 @@ function callback(t) ...@@ -119,7 +137,7 @@ function callback(t)
end end
local stream local stream
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback) res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0, stream_callback)
if res.code ~=0 then if res.code ~=0 then
print("open stream--- failed:"..res.error) print("open stream--- failed:"..res.error)
return return
...@@ -146,4 +164,5 @@ while loop_index < 30 do ...@@ -146,4 +164,5 @@ while loop_index < 30 do
end end
driver.close_stream(stream) driver.close_stream(stream)
driver.close(conn) driver.close(conn)
...@@ -29,8 +29,8 @@ pipeline { ...@@ -29,8 +29,8 @@ pipeline {
agent none agent none
environment{ environment{
WK = '/var/lib/jenkins/workspace/TDinternal' WK = '/data/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community' WKC= '/data/lib/jenkins/workspace/TDinternal/community'
} }
stages { stages {
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def isLuaInstalled(self):
if not which('lua'):
tdLog.exit("Lua not found!")
return False
else:
return True
def run(self):
tdSql.prepare()
# tdLog.info("Check if Lua installed")
# if not self.isLuaInstalled():
# sys.exit(1)
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
targetPath = buildPath + "/../tests/examples/lua"
tdLog.info(targetPath)
currentPath = os.getcwd()
os.chdir(targetPath)
os.system('./build.sh')
os.system('lua test.lua')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
#tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -335,4 +335,6 @@ python3 ./test.py -f tag_lite/alter_tag.py ...@@ -335,4 +335,6 @@ python3 ./test.py -f tag_lite/alter_tag.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py
python3 ./test.py -f tag_lite/drop_auto_create.py python3 ./test.py -f tag_lite/drop_auto_create.py
python3 test.py -f insert/insert_before_use_db.py
#======================p4-end=============== #======================p4-end===============
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.error('insert into tb values (now + 10m, 10)')
tdSql.prepare()
tdSql.error('insert into tb values (now + 10m, 10)')
tdSql.execute('drop database db')
tdSql.error('insert into tb values (now + 10m, 10)')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -35,3 +35,5 @@ python3.8 ./test.py $1 -s && sleep 1 ...@@ -35,3 +35,5 @@ python3.8 ./test.py $1 -s && sleep 1
python3.8 ./test.py $1 -f client/client.py python3.8 ./test.py $1 -f client/client.py
python3.8 ./test.py $1 -s && sleep 1 python3.8 ./test.py $1 -s && sleep 1
# connector
python3.8 ./test.py $1 -f connector/lua.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册