diff --git a/cmake/cmake.options b/cmake/cmake.options index 62816a80d52f0460b9b14035c0032fef8dd7eafa..86096c18fe5cc5955e96827d6c6d1d286ba7f574 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -52,12 +52,22 @@ IF(${TD_WINDOWS}) ON ) ELSE () - - option( - BUILD_TEST - "If build unit tests using googletest" - ON - ) + include(CheckCXXCompilerFlag) + CHECK_CXX_COMPILER_FLAG("-std=c++13" COMPILER_SUPPORTS_CXX13) + IF(${COMPILER_SUPPORTS_CXX13}) + add_definitions(-DCOMPILER_SUPPORTS_CXX13) + option( + BUILD_TEST + "If build unit tests using googletest" + ON + ) + ELSE () + option( + BUILD_TEST + "If build unit tests using googletest" + OFF + ) + ENDIF () ENDIF () option( diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 4226587d56cbbeedd4011baa7cc9bbe8e0b08618..697a53e5708834503da4bf405cc799ca1a5cd4aa 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -199,7 +199,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "experiment.use.snapshot", "false"); + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 6b4772dc2ed131f5d23f37baea1f7ad80ffa090b..45fa94b3bf754d3ace94319b6f65a0ffd8336ff1 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -36,6 +36,7 @@ typedef struct SReadHandle { void* vnode; void* mnd; SMsgCb* pMsgCb; + bool tqReader; } SReadHandle; typedef enum { @@ -133,7 +134,6 @@ int32_t qKillTask(qTaskInfo_t tinfo); */ int32_t qAsyncKillTask(qTaskInfo_t tinfo); - /** * destroy query info structure * @param qHandle @@ -172,6 +172,8 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le */ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); + #ifdef __cplusplus } #endif diff --git a/packaging/cfg/tarbitratord.service b/packaging/cfg/tarbitratord.service deleted file mode 100644 index d60cb536b094fe6b6c472d55076dc4d1db669d68..0000000000000000000000000000000000000000 --- a/packaging/cfg/tarbitratord.service +++ /dev/null @@ -1,20 +0,0 @@ -[Unit] -Description=TDengine arbitrator service -After=network-online.target -Wants=network-online.target - -[Service] -Type=simple -ExecStart=/usr/bin/tarbitrator -TimeoutStopSec=1000000s -LimitNOFILE=infinity -LimitNPROC=infinity -LimitCORE=infinity -TimeoutStartSec=0 -StandardOutput=null -Restart=always -StartLimitBurst=3 -StartLimitInterval=60s - -[Install] -WantedBy=multi-user.target diff --git a/packaging/deb/tarbitratord b/packaging/deb/tarbitratord deleted file mode 100644 index 3f97c3c0c2143817ad4ecbb13fabd8c09fb44c69..0000000000000000000000000000000000000000 --- a/packaging/deb/tarbitratord +++ /dev/null @@ -1,88 +0,0 @@ -#!/bin/bash -# -# Modified from original source: Elastic Search -# https://github.com/elasticsearch/elasticsearch -# Thank you to the Elastic Search authors -# -# chkconfig: 2345 99 01 -# -### BEGIN INIT INFO -# Provides: taoscluster -# Required-Start: $local_fs $network $syslog -# Required-Stop: $local_fs $network $syslog -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Short-Description: Starts taoscluster tarbitrator -# Description: Starts taoscluster tarbitrator, a arbitrator -### END INIT INFO - -set -e - -PATH="/bin:/usr/bin:/sbin:/usr/sbin" -NAME="taoscluster" -USER="root" -GROUP="root" -DAEMON="/usr/local/taos/bin/tarbitrator" -DAEMON_OPTS="" -PID_FILE="/var/run/$NAME.pid" -APPARGS="" - -# Maximum number of open files -MAX_OPEN_FILES=65535 - -. /lib/lsb/init-functions - -case "$1" in - start) - - log_action_begin_msg "Starting tarbitrator..." - if start-stop-daemon --test --start --chuid "$USER:$GROUP" --background --make-pidfile --pidfile "$PID_FILE" --exec "$DAEMON" -- $APPARGS &> /dev/null; then - - touch "$PID_FILE" && chown "$USER":"$GROUP" "$PID_FILE" - - if [ -n "$MAX_OPEN_FILES" ]; then - ulimit -n $MAX_OPEN_FILES - fi - - start-stop-daemon --start --chuid "$USER:$GROUP" --background --make-pidfile --pidfile "$PID_FILE" --exec "$DAEMON" -- $APPARGS - - log_end_msg $? - fi - ;; - - stop) - log_action_begin_msg "Stopping tarbitrator..." - set +e - if [ -f "$PID_FILE" ]; then - start-stop-daemon --stop --pidfile "$PID_FILE" --user "$USER" --retry=TERM/120/KILL/5 > /dev/null - if [ $? -eq 1 ]; then - log_action_cont_msg "TSD is not running but pid file exists, cleaning up" - elif [ $? -eq 3 ]; then - PID="`cat $PID_FILE`" - log_failure_msg "Failed to stop tarbitrator (pid $PID)" - exit 1 - fi - rm -f "$PID_FILE" - else - log_action_cont_msg "tarbitrator was not running" - fi - log_action_end_msg 0 - set -e - ;; - - restart|force-reload) - if [ -f "$PID_FILE" ]; then - $0 stop - sleep 1 - fi - $0 start - ;; - status) - status_of_proc -p "$PID_FILE" "$DAEMON" "$NAME" - ;; - *) - exit 1 - ;; -esac - -exit 0 diff --git a/packaging/release.sh b/packaging/release.sh index 1d9f6dfc68088443e671aab6f6bb4c1c2909cef4..3426c2856d03573cb6ed6ec6078c38dfa6371cb0 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -111,9 +111,9 @@ else fi csudo="" -if command -v sudo > /dev/null; then - csudo="sudo " -fi +#if command -v sudo > /dev/null; then +# csudo="sudo " +#fi function is_valid_version() { [ -z $1 ] && return 1 || : @@ -181,7 +181,9 @@ cd "${curr_dir}" # 2. cmake executable file compile_dir="${top_dir}/debug" -${csudo}rm -rf ${compile_dir} +if [ -d ${compile_dir} ]; then + rm -rf ${compile_dir} +fi mkdir -p ${compile_dir} cd ${compile_dir} @@ -258,9 +260,9 @@ if [ "$osType" != "Darwin" ]; then if [[ "$pagMode" == "full" ]]; then if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then cd ${top_dir}/tools/taos-tools/packaging/deb + taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}') [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" - taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}') ${csudo}./make-taos-tools-deb.sh ${top_dir} \ ${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} fi @@ -283,9 +285,9 @@ if [ "$osType" != "Darwin" ]; then if [[ "$pagMode" == "full" ]]; then if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then cd ${top_dir}/tools/taos-tools/packaging/rpm + taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g') [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" - taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g') ${csudo}./make-taos-tools-rpm.sh ${top_dir} \ ${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} fi @@ -300,7 +302,6 @@ if [ "$osType" != "Darwin" ]; then ${csudo}./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${verNumberComp} ${dbName} ${csudo}./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName} - # ${csudo}./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} else # only make client for Darwin diff --git a/packaging/rpm/tarbitratord b/packaging/rpm/tarbitratord deleted file mode 100644 index 68138f5c1d5d4491b5fda52b08cfd51a039ffc64..0000000000000000000000000000000000000000 --- a/packaging/rpm/tarbitratord +++ /dev/null @@ -1,141 +0,0 @@ -#!/bin/bash -# -# tarbitratord This shell script takes care of starting and stopping tarbitrator. -# -# chkconfig: 2345 99 01 -# description: tarbitrator is a arbitrator used in TDengine cluster. -# -# -### BEGIN INIT INFO -# Provides: taoscluster -# Required-Start: $network $local_fs $remote_fs -# Required-Stop: $network $local_fs $remote_fs -# Short-Description: start and stop tarbitrator -# Description: tarbitrator is a arbitrator used in TDengine cluster. -### END INIT INFO - -# Source init functions -. /etc/init.d/functions - -# Maximum number of open files -MAX_OPEN_FILES=65535 - -# Default program options -NAME=tarbitrator -PROG=/usr/local/taos/bin/tarbitrator -USER=root -GROUP=root - -# Default directories -LOCK_DIR=/var/lock/subsys -PID_DIR=/var/run/$NAME - -# Set file names -LOCK_FILE=$LOCK_DIR/$NAME -PID_FILE=$PID_DIR/$NAME.pid - -[ -e $PID_DIR ] || mkdir -p $PID_DIR - -PROG_OPTS="" - -start() { - echo -n "Starting ${NAME}: " - # check identity - curid="`id -u -n`" - if [ "$curid" != root ] && [ "$curid" != "$USER" ] ; then - echo "Must be run as root or $USER, but was run as $curid" - return 1 - fi - # Sets the maximum number of open file descriptors allowed. - ulimit -n $MAX_OPEN_FILES - curulimit="`ulimit -n`" - if [ "$curulimit" -lt $MAX_OPEN_FILES ] ; then - echo "'ulimit -n' must be greater than or equal to $MAX_OPEN_FILES, is $curulimit" - return 1 - fi - - if [ "`id -u -n`" == root ] ; then - # Changes the owner of the lock, and the pid files to allow - # non-root OpenTSDB daemons to run /usr/share/opentsdb/bin/opentsdb_restart.py. - touch $LOCK_FILE && chown $USER:$GROUP $LOCK_FILE - touch $PID_FILE && chown $USER:$GROUP $PID_FILE - daemon --user $USER --pidfile $PID_FILE "$PROG $PROG_OPTS &> /dev/null &" - else - # Don't have to change user. - daemon --pidfile $PID_FILE "$PROG $PROG_OPTS &> /dev/null &" - fi - retval=$? - sleep 2 - echo - [ $retval -eq 0 ] && (findproc > $PID_FILE && touch $LOCK_FILE) - return $retval -} - -stop() { - echo -n "Stopping ${NAME}: " - killproc -p $PID_FILE $NAME - retval=$? - echo - # Non-root users don't have enough permission to remove pid and lock files. - # So, the opentsdb_restart.py cannot get rid of the files, and the command - # "service opentsdb status" will complain about the existing pid file. - # Makes the pid file empty. - echo > $PID_FILE - [ $retval -eq 0 ] && (rm -f $PID_FILE && rm -f $LOCK_FILE) - return $retval -} - -restart() { - stop - start -} - -reload() { - restart -} - -force_reload() { - restart -} - -rh_status() { - # run checks to determine if the service is running or use generic status - status -p $PID_FILE -l $LOCK_FILE $NAME -} - -rh_status_q() { - rh_status >/dev/null 2>&1 -} - -case "$1" in - start) - rh_status_q && exit 0 - $1 - ;; - stop) - rh_status_q || exit 0 - $1 - ;; - restart) - $1 - ;; - reload) - rh_status_q || exit 7 - $1 - ;; - force-reload) - force_reload - ;; - status) - rh_status - ;; - condrestart|try-restart) - rh_status_q || exit 0 - restart - ;; - *) - echo "Usage: $0 {start|stop|status|restart|condrestart|try-restart|reload|force-reload}" - exit 2 -esac - -exit $? diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 7c7d5477cfabdf532fe8c225c8ca9f351d4f7035..2d17ef481219f5ee747f7596d4d3d2a656e90fb9 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -194,7 +194,6 @@ function install_bin() { ${csudo}rm -f ${bin_link_dir}/${serverName} || : ${csudo}rm -f ${bin_link_dir}/${adapterName} || : ${csudo}rm -f ${bin_link_dir}/${uninstallScript} || : - ${csudo}rm -f ${bin_link_dir}/tarbitrator || : ${csudo}rm -f ${bin_link_dir}/set_core || : ${csudo}rm -f ${bin_link_dir}/TDinsight.sh || : @@ -210,7 +209,6 @@ function install_bin() { [ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || : [ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : - [ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo}ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || : if [ "$verMode" == "cluster" ]; then ${csudo}cp -r ${script_dir}/nginxd/* ${nginx_dir} && ${csudo}chmod 0555 ${nginx_dir}/* @@ -606,28 +604,19 @@ function install_service_on_sysvinit() { if ((${os_type} == 1)); then # ${csudo}cp -f ${script_dir}/init.d/${serverName}.deb ${install_main_dir}/init.d/${serverName} ${csudo}cp ${script_dir}/init.d/${serverName}.deb ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName} - # ${csudo}cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord - ${csudo}cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo}chmod a+x ${service_config_dir}/tarbitratord elif ((${os_type} == 2)); then # ${csudo}cp -f ${script_dir}/init.d/${serverName}.rpm ${install_main_dir}/init.d/${serverName} ${csudo}cp ${script_dir}/init.d/${serverName}.rpm ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName} - # ${csudo}cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord - ${csudo}cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo}chmod a+x ${service_config_dir}/tarbitratord fi if ((${initd_mod} == 1)); then ${csudo}chkconfig --add ${serverName} || : ${csudo}chkconfig --level 2345 ${serverName} on || : - ${csudo}chkconfig --add tarbitratord || : - ${csudo}chkconfig --level 2345 tarbitratord on || : elif ((${initd_mod} == 2)); then ${csudo}insserv ${serverName} || : ${csudo}insserv -d ${serverName} || : - ${csudo}insserv tarbitratord || : - ${csudo}insserv -d tarbitratord || : elif ((${initd_mod} == 3)); then ${csudo}update-rc.d ${serverName} defaults || : - ${csudo}update-rc.d tarbitratord defaults || : fi } @@ -669,9 +658,6 @@ function install_service_on_systemd() { ${csudo}systemctl enable ${serverName} - [ -f ${script_dir}/cfg/tarbitratord.service ] && - ${csudo}cp ${script_dir}/cfg/tarbitratord.service \ - ${service_config_dir}/ || : ${csudo}systemctl daemon-reload if [ "$verMode" == "cluster" ]; then diff --git a/packaging/tools/install_arbi.sh b/packaging/tools/install_arbi.sh deleted file mode 100755 index 28636401531e1e5f0c8eee4c1059303b0ac34612..0000000000000000000000000000000000000000 --- a/packaging/tools/install_arbi.sh +++ /dev/null @@ -1,340 +0,0 @@ -#!/bin/bash -# -# This file is used to install database on linux systems. The operating system -# is required to use systemd to manage services at boot - -set -e -#set -x - -# -----------------------Variables definition--------------------- -script_dir=$(dirname $(readlink -f "$0")) - -bin_link_dir="/usr/bin" -#inc_link_dir="/usr/include" - -#install main path -install_main_dir="/usr/local/tarbitrator" - -# old bin dir -bin_dir="/usr/local/tarbitrator/bin" - -service_config_dir="/etc/systemd/system" - -# Color setting -RED='\033[0;31m' -GREEN='\033[1;32m' -GREEN_DARK='\033[0;32m' -GREEN_UNDERLINE='\033[4;32m' -NC='\033[0m' - -csudo="" -if command -v sudo >/dev/null; then - csudo="sudo " -fi - -update_flag=0 - -initd_mod=0 -service_mod=2 -if pidof systemd &>/dev/null; then - service_mod=0 -elif $(which service &>/dev/null); then - service_mod=1 - service_config_dir="/etc/init.d" - if $(which chkconfig &>/dev/null); then - initd_mod=1 - elif $(which insserv &>/dev/null); then - initd_mod=2 - elif $(which update-rc.d &>/dev/null); then - initd_mod=3 - else - service_mod=2 - fi -else - service_mod=2 -fi - -# get the operating system type for using the corresponding init file -# ubuntu/debian(deb), centos/fedora(rpm), others: opensuse, redhat, ..., no verification -#osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release) -if [[ -e /etc/os-release ]]; then - osinfo=$(cat /etc/os-release | grep "NAME" | cut -d '"' -f2) || : -else - osinfo="" -fi -#echo "osinfo: ${osinfo}" -os_type=0 -if echo $osinfo | grep -qwi "ubuntu"; then - # echo "This is ubuntu system" - os_type=1 -elif echo $osinfo | grep -qwi "debian"; then - # echo "This is debian system" - os_type=1 -elif echo $osinfo | grep -qwi "Kylin"; then - # echo "This is Kylin system" - os_type=1 -elif echo $osinfo | grep -qwi "centos"; then - # echo "This is centos system" - os_type=2 -elif echo $osinfo | grep -qwi "fedora"; then - # echo "This is fedora system" - os_type=2 -else - echo " osinfo: ${osinfo}" - echo " This is an officially unverified linux system," - echo " if there are any problems with the installation and operation, " - echo " please feel free to contact taosdata.com for support." - os_type=1 -fi - -function kill_tarbitrator() { - pid=$(ps -ef | grep "tarbitrator" | grep -v "grep" | awk '{print $2}') - if [ -n "$pid" ]; then - ${csudo}kill -9 $pid || : - fi -} - -function install_main_path() { - #create install main dir and all sub dir - ${csudo}rm -rf ${install_main_dir} || : - ${csudo}mkdir -p ${install_main_dir} - ${csudo}mkdir -p ${install_main_dir}/bin - #${csudo}mkdir -p ${install_main_dir}/include - ${csudo}mkdir -p ${install_main_dir}/init.d -} - -function install_bin() { - # Remove links - ${csudo}rm -f ${bin_link_dir}/rmtarbitrator || : - ${csudo}rm -f ${bin_link_dir}/tarbitrator || : - ${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin && ${csudo}chmod 0555 ${install_main_dir}/bin/* - - #Make link - [ -x ${install_main_dir}/bin/remove_arbi.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove_arbi.sh ${bin_link_dir}/rmtarbitrator || : - [ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo}ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || : -} - -function install_header() { - ${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || : - ${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/* - ${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h - ${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h - ${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h - ${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h -} - -function install_jemalloc() { - jemalloc_dir=${script_dir}/jemalloc - - if [ -d ${jemalloc_dir} ]; then - ${csudo}/usr/bin/install -c -d /usr/local/bin - - if [ -f ${jemalloc_dir}/bin/jemalloc-config ]; then - ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/bin/jemalloc-config /usr/local/bin - fi - if [ -f ${jemalloc_dir}/bin/jemalloc.sh ]; then - ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/bin/jemalloc.sh /usr/local/bin - fi - if [ -f ${jemalloc_dir}/bin/jeprof ]; then - ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/bin/jeprof /usr/local/bin - fi - if [ -f ${jemalloc_dir}/include/jemalloc/jemalloc.h ]; then - ${csudo}/usr/bin/install -c -d /usr/local/include/jemalloc - ${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/include/jemalloc/jemalloc.h /usr/local/include/jemalloc - fi - if [ -f ${jemalloc_dir}/lib/libjemalloc.so.2 ]; then - ${csudo}/usr/bin/install -c -d /usr/local/lib - ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.so.2 /usr/local/lib - ${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so - ${csudo}/usr/bin/install -c -d /usr/local/lib - if [ -f ${jemalloc_dir}/lib/libjemalloc.a ]; then - ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.a /usr/local/lib - fi - if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then - ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc_pic.a /usr/local/lib - fi - if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then - ${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig - ${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig - fi - fi - if [ -f ${jemalloc_dir}/share/doc/jemalloc/jemalloc.html ]; then - ${csudo}/usr/bin/install -c -d /usr/local/share/doc/jemalloc - ${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/share/doc/jemalloc/jemalloc.html /usr/local/share/doc/jemalloc - fi - if [ -f ${jemalloc_dir}/share/man/man3/jemalloc.3 ]; then - ${csudo}/usr/bin/install -c -d /usr/local/share/man/man3 - ${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/share/man/man3/jemalloc.3 /usr/local/share/man/man3 - fi - - if [ -d /etc/ld.so.conf.d ]; then - echo "/usr/local/lib" | ${csudo}tee /etc/ld.so.conf.d/jemalloc.conf >/dev/null || echo -e "failed to write /etc/ld.so.conf.d/jemalloc.conf" - ${csudo}ldconfig - else - echo "/etc/ld.so.conf.d not found!" - fi - fi -} - -function clean_service_on_sysvinit() { - if pidof tarbitrator &>/dev/null; then - ${csudo}service tarbitratord stop || : - fi - - if ((${initd_mod} == 1)); then - if [ -e ${service_config_dir}/tarbitratord ]; then - ${csudo}chkconfig --del tarbitratord || : - fi - elif ((${initd_mod} == 2)); then - if [ -e ${service_config_dir}/tarbitratord ]; then - ${csudo}insserv -r tarbitratord || : - fi - elif ((${initd_mod} == 3)); then - if [ -e ${service_config_dir}/tarbitratord ]; then - ${csudo}update-rc.d -f tarbitratord remove || : - fi - fi - - ${csudo}rm -f ${service_config_dir}/tarbitratord || : - - if $(which init &>/dev/null); then - ${csudo}init q || : - fi -} - -function install_service_on_sysvinit() { - clean_service_on_sysvinit - sleep 1 - - if ((${os_type} == 1)); then - ${csudo}cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord - ${csudo}cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo}chmod a+x ${service_config_dir}/tarbitratord - elif ((${os_type} == 2)); then - ${csudo}cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord - ${csudo}cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo}chmod a+x ${service_config_dir}/tarbitratord - fi - - if ((${initd_mod} == 1)); then - ${csudo}chkconfig --add tarbitratord || : - ${csudo}chkconfig --level 2345 tarbitratord on || : - elif ((${initd_mod} == 2)); then - ${csudo}insserv tarbitratord || : - ${csudo}insserv -d tarbitratord || : - elif ((${initd_mod} == 3)); then - ${csudo}update-rc.d tarbitratord defaults || : - fi -} - -function clean_service_on_systemd() { - tarbitratord_service_config="${service_config_dir}/tarbitratord.service" - if systemctl is-active --quiet tarbitratord; then - echo "tarbitrator is running, stopping it..." - ${csudo}systemctl stop tarbitratord &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable tarbitratord &>/dev/null || echo &>/dev/null - - ${csudo}rm -f ${tarbitratord_service_config} -} - -function install_service_on_systemd() { - clean_service_on_systemd - - tarbitratord_service_config="${service_config_dir}/tarbitratord.service" - - ${csudo}bash -c "echo '[Unit]' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'Description=TDengine arbitrator service' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'After=network-online.target' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'Wants=network-online.target' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo '[Service]' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'TimeoutStartSec=0' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'StandardOutput=null' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'Restart=always' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'StartLimitBurst=3' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'StartLimitInterval=60s' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo '[Install]' >> ${tarbitratord_service_config}" - ${csudo}bash -c "echo 'WantedBy=multi-user.target' >> ${tarbitratord_service_config}" - ${csudo}systemctl enable tarbitratord -} - -function install_service() { - if ((${service_mod} == 0)); then - install_service_on_systemd - elif ((${service_mod} == 1)); then - install_service_on_sysvinit - else - kill_tarbitrator - fi -} - -function update_TDengine() { - # Start to update - echo -e "${GREEN}Start to update TDengine's arbitrator ...${NC}" - # Stop the service if running - if pidof tarbitrator &>/dev/null; then - if ((${service_mod} == 0)); then - ${csudo}systemctl stop tarbitratord || : - elif ((${service_mod} == 1)); then - ${csudo}service tarbitratord stop || : - else - kill_tarbitrator - fi - sleep 1 - fi - - install_main_path - #install_header - install_bin - install_service - install_jemalloc - - echo - if ((${service_mod} == 0)); then - echo -e "${GREEN_DARK}To start arbitrator ${NC}: ${csudo}systemctl start tarbitratord${NC}" - elif ((${service_mod} == 1)); then - echo -e "${GREEN_DARK}To start arbitrator ${NC}: ${csudo}service tarbitratord start${NC}" - else - echo -e "${GREEN_DARK}To start arbitrator ${NC}: ./tarbitrator${NC}" - fi - echo - echo -e "\033[44;32;1mTDengine's arbitrator is updated successfully!${NC}" -} - -function install_TDengine() { - # Start to install - echo -e "${GREEN}Start to install TDengine's arbitrator ...${NC}" - - install_main_path - #install_header - install_bin - install_service - install_jemalloc - - echo - if ((${service_mod} == 0)); then - echo -e "${GREEN_DARK}To start arbitrator ${NC}: ${csudo}systemctl start tarbitratord${NC}" - elif ((${service_mod} == 1)); then - echo -e "${GREEN_DARK}To start arbitrator ${NC}: ${csudo}service tarbitratord start${NC}" - else - echo -e "${GREEN_DARK}To start arbitrator ${NC}: tarbitrator${NC}" - fi - - echo -e "\033[44;32;1mTDengine's arbitrator is installed successfully!${NC}" - echo -} - -## ==============================Main program starts from here============================ -# Install server and client -if [ -x ${bin_dir}/tarbitrator ]; then - update_flag=1 - update_TDengine -else - install_TDengine -fi diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index 59be60f8fc362606542dd94956d4f99416131655..4e4ebb72c01474832a8319b20fea96813a6a761a 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -185,7 +185,6 @@ function install_bin() { [ -f ${binary_dir}/build/bin/taosadapter ] && ${csudo}cp -r ${binary_dir}/build/bin/taosadapter ${install_main_dir}/bin || : [ -f ${binary_dir}/build/bin/udfd ] && ${csudo}cp -r ${binary_dir}/build/bin/udfd ${install_main_dir}/bin || : ${csudo}cp -r ${binary_dir}/build/bin/${serverName} ${install_main_dir}/bin || : - # ${csudo}cp -r ${binary_dir}/build/bin/tarbitrator ${install_main_dir}/bin || : ${csudo}cp -r ${script_dir}/taosd-dump-cfg.gdb ${install_main_dir}/bin || : ${csudo}cp -r ${script_dir}/remove.sh ${install_main_dir}/bin || : diff --git a/packaging/tools/makearbi.sh b/packaging/tools/makearbi.sh deleted file mode 100755 index cfae2b4a4643a375357630be21eb3ed635e7d37c..0000000000000000000000000000000000000000 --- a/packaging/tools/makearbi.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash -# -# Generate arbitrator's tar.gz setup package for all os system - -set -e -#set -x - -curr_dir=$(pwd) -compile_dir=$1 -version=$2 -build_time=$3 -cpuType=$4 -osType=$5 -verMode=$6 -verType=$7 -pagMode=$8 - -script_dir="$(dirname $(readlink -f $0))" -top_dir="$(readlink -f ${script_dir}/../..)" - -productName="TDengine" - -# create compressed install file. -build_dir="${compile_dir}/build" -code_dir="${top_dir}" -release_dir="${top_dir}/release" - -#package_name='linux' -if [ "$verMode" == "cluster" ]; then - install_dir="${release_dir}/${productName}-enterprise-arbitrator-${version}" -else - install_dir="${release_dir}/${productName}-arbitrator-${version}" -fi - -# Directories and files. -bin_files="${build_dir}/bin/tarbitrator ${script_dir}/remove_arbi.sh" -install_files="${script_dir}/install_arbi.sh" - -#header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h" -init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord -init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord - -# make directories. -mkdir -p ${install_dir} && cp ${install_files} ${install_dir} && chmod a+x ${install_dir}/install_arbi.sh || : -#mkdir -p ${install_dir}/inc && cp ${header_files} ${install_dir}/inc || : -mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || : -mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || : -mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || : - -cd ${release_dir} - -# install_dir has been distinguishes cluster from edege, so comments this code -pkg_name=${install_dir}-${osType}-${cpuType} - -if [[ "$verType" == "beta" ]] || [[ "$verType" == "preRelease" ]]; then - pkg_name=${install_dir}-${verType}-${osType}-${cpuType} -elif [ "$verType" == "stable" ]; then - pkg_name=${pkg_name} -else - echo "unknow verType, nor stabel or beta" - exit 1 -fi - -tar -zcv -f "$(basename ${pkg_name}).tar.gz" $(basename ${install_dir}) --remove-files || : -exitcode=$? -if [ "$exitcode" != "0" ]; then - echo "tar ${pkg_name}.tar.gz error !!!" - exit $exitcode -fi - -cd ${curr_dir} diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 0bc11b99b358f650386fc45f9f76aa00b8ec5ec8..cd656fe6f21a960f6682d87a25084d8e125a1dcf 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -85,7 +85,6 @@ else ${build_dir}/bin/${clientName} \ ${taostools_bin_files} \ ${build_dir}/bin/taosadapter \ - ${build_dir}/bin/tarbitrator\ ${script_dir}/remove.sh \ ${script_dir}/set_core.sh \ ${script_dir}/startPre.sh \ @@ -106,8 +105,6 @@ nginx_dir="${top_dir}/../enterprise/src/plugins/web" init_file_deb=${script_dir}/../deb/taosd init_file_rpm=${script_dir}/../rpm/taosd -init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord -init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord # make directories. mkdir -p ${install_dir} @@ -126,10 +123,6 @@ if [ -f "${cfg_dir}/${serverName}.service" ]; then cp ${cfg_dir}/${serverName}.service ${install_dir}/cfg || : fi -if [ -f "${top_dir}/packaging/cfg/tarbitratord.service" ]; then - cp ${top_dir}/packaging/cfg/tarbitratord.service ${install_dir}/cfg || : -fi - if [ -f "${top_dir}/packaging/cfg/nginxd.service" ]; then cp ${top_dir}/packaging/cfg/nginxd.service ${install_dir}/cfg || : fi @@ -137,8 +130,6 @@ fi mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || : mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/${serverName}.deb mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/${serverName}.rpm -mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || : -mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || : if [ $adapterName != "taosadapter" ]; then mv ${install_dir}/cfg/taosadapter.toml ${install_dir}/cfg/$adapterName.toml diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index 413de17ee6e7b3d4212531de5945e56d7953271f..d34c81a4f59186ead2dd4ba58c4c14ebba52276d 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Script to stop the service and uninstall TDengine, but retain the config, data and log files. +# Script to stop and uninstall the service, but retain the config, data and log files. set -e #set -x @@ -11,121 +11,216 @@ RED='\033[0;31m' GREEN='\033[1;32m' NC='\033[0m' +installDir="/usr/local/taos" +serverName="taosd" +clientName="taos" +uninstallScript="rmtaos" +productName="TDengine" + #install main path -install_main_dir="/usr/local/taos" -data_link_dir="/usr/local/taos/data" -log_link_dir="/usr/local/taos/log" -cfg_link_dir="/usr/local/taos/cfg" +install_main_dir=${installDir} +data_link_dir=${installDir}/data +log_link_dir=${installDir}/log +cfg_link_dir=${installDir}/cfg bin_link_dir="/usr/bin" +local_bin_link_dir="/usr/local/bin" lib_link_dir="/usr/lib" lib64_link_dir="/usr/lib64" inc_link_dir="/usr/include" +install_nginxd_dir="/usr/local/nginxd" service_config_dir="/etc/systemd/system" -taos_service_name="taosd" - +taos_service_name=${serverName} +taosadapter_service_name="taosadapter" +tarbitrator_service_name="tarbitratord" +nginx_service_name="nginxd" csudo="" -if command -v sudo > /dev/null; then - csudo="sudo" +if command -v sudo >/dev/null; then + csudo="sudo " fi initd_mod=0 service_mod=2 -if pidof systemd &> /dev/null; then - service_mod=0 -elif $(which service &> /dev/null); then - service_mod=1 - service_config_dir="/etc/init.d" - if $(which chkconfig &> /dev/null); then - initd_mod=1 - elif $(which insserv &> /dev/null); then - initd_mod=2 - elif $(which update-rc.d &> /dev/null); then - initd_mod=3 - else - service_mod=2 - fi -else +if pidof systemd &>/dev/null; then + service_mod=0 +elif $(which service &>/dev/null); then + service_mod=1 + service_config_dir="/etc/init.d" + if $(which chkconfig &>/dev/null); then + initd_mod=1 + elif $(which insserv &>/dev/null); then + initd_mod=2 + elif $(which update-rc.d &>/dev/null); then + initd_mod=3 + else service_mod=2 + fi +else + service_mod=2 fi +function kill_taosadapter() { + pid=$(ps -ef | grep "taosadapter" | grep -v "grep" | awk '{print $2}') + if [ -n "$pid" ]; then + ${csudo}kill -9 $pid || : + fi +} function kill_taosd() { - pid=$(ps -ef | grep "taosd" | grep -v "grep" | awk '{print $2}') + pid=$(ps -ef | grep ${serverName} | grep -v "grep" | awk '{print $2}') if [ -n "$pid" ]; then - ${csudo} kill -9 $pid || : + ${csudo}kill -9 $pid || : fi } +function kill_tarbitrator() { + pid=$(ps -ef | grep "tarbitrator" | grep -v "grep" | awk '{print $2}') + if [ -n "$pid" ]; then + ${csudo}kill -9 $pid || : + fi +} function clean_bin() { - # Remove link - ${csudo} rm -f ${bin_link_dir}/taos || : - ${csudo} rm -f ${bin_link_dir}/taosd || : - ${csudo} rm -f ${bin_link_dir}/create_table || : - ${csudo} rm -f ${bin_link_dir}/tmq_sim || : - ${csudo} rm -f ${bin_link_dir}/taosdemo || : - ${csudo} rm -f ${bin_link_dir}/taosdump || : - ${csudo} rm -f ${bin_link_dir}/rmtaos || : - #${csudo} rm -f ${bin_link_dir}/set_core || : + # Remove link + ${csudo}rm -f ${bin_link_dir}/${clientName} || : + ${csudo}rm -f ${bin_link_dir}/${serverName} || : + ${csudo}rm -f ${bin_link_dir}/taosadapter || : + ${csudo}rm -f ${bin_link_dir}/taosBenchmark || : + ${csudo}rm -f ${bin_link_dir}/taosdemo || : + ${csudo}rm -f ${bin_link_dir}/taosdump || : + ${csudo}rm -f ${bin_link_dir}/${uninstallScript} || : + ${csudo}rm -f ${bin_link_dir}/tarbitrator || : + ${csudo}rm -f ${bin_link_dir}/set_core || : + ${csudo}rm -f ${bin_link_dir}/TDinsight.sh || : +} + +function clean_local_bin() { + ${csudo}rm -f ${local_bin_link_dir}/taosBenchmark || : + ${csudo}rm -f ${local_bin_link_dir}/taosdemo || : } function clean_lib() { - # Remove link - ${csudo} rm -f ${lib_link_dir}/libtaos.* || : - ${csudo} rm -f ${lib64_link_dir}/libtaos.* || : - - ${csudo} rm -f ${lib_link_dir}/libtdb.* || : - ${csudo} rm -f ${lib64_link_dir}/libtdb.* || : + # Remove link + ${csudo}rm -f ${lib_link_dir}/libtaos.* || : + ${csudo}rm -f ${lib64_link_dir}/libtaos.* || : + #${csudo}rm -rf ${v15_java_app_dir} || : } function clean_header() { - # Remove link - ${csudo} rm -f ${inc_link_dir}/taos.h || : - ${csudo} rm -f ${inc_link_dir}/taosdef.h || : - ${csudo} rm -f ${inc_link_dir}/taoserror.h || : - ${csudo} rm -f ${inc_link_dir}/taosudf.h || : + # Remove link + ${csudo}rm -f ${inc_link_dir}/taos.h || : + ${csudo}rm -f ${inc_link_dir}/taosdef.h || : + ${csudo}rm -f ${inc_link_dir}/taoserror.h || : } function clean_config() { - # Remove link - ${csudo} rm -f ${cfg_link_dir}/* || : + # Remove link + ${csudo}rm -f ${cfg_link_dir}/* || : } function clean_log() { - # Remove link - ${csudo} rm -rf ${log_link_dir} || : + # Remove link + ${csudo}rm -rf ${log_link_dir} || : } function clean_service_on_systemd() { - taosd_service_config="${service_config_dir}/${taos_service_name}.service" - if systemctl is-active --quiet ${taos_service_name}; then - echo "TDengine taosd is running, stopping it..." - ${csudo} systemctl stop ${taos_service_name} &> /dev/null || echo &> /dev/null + taosd_service_config="${service_config_dir}/${taos_service_name}.service" + if systemctl is-active --quiet ${taos_service_name}; then + echo "${productName} ${serverName} is running, stopping it..." + ${csudo}systemctl stop ${taos_service_name} &>/dev/null || echo &>/dev/null + fi + ${csudo}systemctl disable ${taos_service_name} &>/dev/null || echo &>/dev/null + ${csudo}rm -f ${taosd_service_config} + + taosadapter_service_config="${service_config_dir}/taosadapter.service" + if systemctl is-active --quiet ${taosadapter_service_name}; then + echo "${productName} taosAdapter is running, stopping it..." + ${csudo}systemctl stop ${taosadapter_service_name} &>/dev/null || echo &>/dev/null + fi + ${csudo}systemctl disable ${taosadapter_service_name} &>/dev/null || echo &>/dev/null + [ -f ${taosadapter_service_config} ] && ${csudo}rm -f ${taosadapter_service_config} + + tarbitratord_service_config="${service_config_dir}/${tarbitrator_service_name}.service" + if systemctl is-active --quiet ${tarbitrator_service_name}; then + echo "${productName} tarbitrator is running, stopping it..." + ${csudo}systemctl stop ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null + fi + ${csudo}systemctl disable ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null + ${csudo}rm -f ${tarbitratord_service_config} + + if [ "$verMode" == "cluster" ]; then + nginx_service_config="${service_config_dir}/${nginx_service_name}.service" + if [ -d ${install_nginxd_dir} ]; then + if systemctl is-active --quiet ${nginx_service_name}; then + echo "Nginx for ${productName} is running, stopping it..." + ${csudo}systemctl stop ${nginx_service_name} &>/dev/null || echo &>/dev/null + fi + ${csudo}systemctl disable ${nginx_service_name} &>/dev/null || echo &>/dev/null + ${csudo}rm -f ${nginx_service_config} fi - ${csudo} systemctl disable ${taos_service_name} &> /dev/null || echo &> /dev/null - ${csudo} rm -f ${taosd_service_config} + fi } function clean_service_on_sysvinit() { - echo " " + if pidof ${serverName} &>/dev/null; then + echo "${productName} ${serverName} is running, stopping it..." + ${csudo}service ${serverName} stop || : + fi + + if pidof tarbitrator &>/dev/null; then + echo "${productName} tarbitrator is running, stopping it..." + ${csudo}service tarbitratord stop || : + fi + + if ((${initd_mod} == 1)); then + if [ -e ${service_config_dir}/${serverName} ]; then + ${csudo}chkconfig --del ${serverName} || : + fi + if [ -e ${service_config_dir}/tarbitratord ]; then + ${csudo}chkconfig --del tarbitratord || : + fi + elif ((${initd_mod} == 2)); then + if [ -e ${service_config_dir}/${serverName} ]; then + ${csudo}insserv -r ${serverName} || : + fi + if [ -e ${service_config_dir}/tarbitratord ]; then + ${csudo}insserv -r tarbitratord || : + fi + elif ((${initd_mod} == 3)); then + if [ -e ${service_config_dir}/${serverName} ]; then + ${csudo}update-rc.d -f ${serverName} remove || : + fi + if [ -e ${service_config_dir}/tarbitratord ]; then + ${csudo}update-rc.d -f tarbitratord remove || : + fi + fi + + ${csudo}rm -f ${service_config_dir}/${serverName} || : + ${csudo}rm -f ${service_config_dir}/tarbitratord || : + + if $(which init &>/dev/null); then + ${csudo}init q || : + fi } function clean_service() { - if ((${service_mod}==0)); then - clean_service_on_systemd - elif ((${service_mod}==1)); then - clean_service_on_sysvinit - else - # must manual stop taosd - kill_taosd - fi + if ((${service_mod} == 0)); then + clean_service_on_systemd + elif ((${service_mod} == 1)); then + clean_service_on_sysvinit + else + kill_taosadapter + kill_taosd + kill_tarbitrator + fi } # Stop service and disable booting start. clean_service # Remove binary file and links clean_bin +# Remove links of local bin +clean_local_bin # Remove header file. clean_header # Remove lib file @@ -135,15 +230,26 @@ clean_log # Remove link configuration file clean_config # Remove data link directory -${csudo} rm -rf ${data_link_dir} || : - -${csudo} rm -rf ${install_main_dir} +${csudo}rm -rf ${data_link_dir} || : +${csudo}rm -rf ${install_main_dir} +${csudo}rm -rf ${install_nginxd_dir} if [[ -e /etc/os-release ]]; then osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release) else osinfo="" fi -echo -e "${GREEN}TDengine is removed successfully!${NC}" +if echo $osinfo | grep -qwi "ubuntu"; then + # echo "this is ubuntu system" + ${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || : +elif echo $osinfo | grep -qwi "debian"; then + # echo "this is debian system" + ${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || : +elif echo $osinfo | grep -qwi "centos"; then + # echo "this is centos system" + ${csudo}rpm -e --noscripts tdengine >/dev/null 2>&1 || : +fi + +echo -e "${GREEN}${productName} is removed successfully!${NC}" echo diff --git a/packaging/tools/remove_arbi.sh b/packaging/tools/remove_arbi.sh deleted file mode 100755 index c95c579d309257dcd41076fc69cd6b3a5d0032f1..0000000000000000000000000000000000000000 --- a/packaging/tools/remove_arbi.sh +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/bash -# -# Script to stop the service and uninstall TDengine's arbitrator - -set -e -#set -x - -verMode=edge - -RED='\033[0;31m' -GREEN='\033[1;32m' -NC='\033[0m' - -#install main path -install_main_dir="/usr/local/tarbitrator" -bin_link_dir="/usr/bin" -#inc_link_dir="/usr/include" - -service_config_dir="/etc/systemd/system" -tarbitrator_service_name="tarbitratord" -csudo="" -if command -v sudo > /dev/null; then - csudo="sudo " -fi - -initd_mod=0 -service_mod=2 -if pidof systemd &> /dev/null; then - service_mod=0 -elif $(which service &> /dev/null); then - service_mod=1 - service_config_dir="/etc/init.d" - if $(which chkconfig &> /dev/null); then - initd_mod=1 - elif $(which insserv &> /dev/null); then - initd_mod=2 - elif $(which update-rc.d &> /dev/null); then - initd_mod=3 - else - service_mod=2 - fi -else - service_mod=2 -fi - -function kill_tarbitrator() { - pid=$(ps -ef | grep "tarbitrator" | grep -v "grep" | awk '{print $2}') - if [ -n "$pid" ]; then - ${csudo}kill -9 $pid || : - fi -} -function clean_bin() { - # Remove link - ${csudo}rm -f ${bin_link_dir}/tarbitrator || : -} - -function clean_header() { - # Remove link - ${csudo}rm -f ${inc_link_dir}/taos.h || : - ${csudo}rm -f ${inc_link_dir}/taosdef.h || : - ${csudo}rm -f ${inc_link_dir}/taoserror.h || : - ${csudo}rm -f ${inc_link_dir}/taosudf.h || : - -} - -function clean_log() { - # Remove link - ${csudo}rm -rf /arbitrator.log || : -} - -function clean_service_on_systemd() { - tarbitratord_service_config="${service_config_dir}/${tarbitrator_service_name}.service" - - if systemctl is-active --quiet ${tarbitrator_service_name}; then - echo "TDengine tarbitrator is running, stopping it..." - ${csudo}systemctl stop ${tarbitrator_service_name} &> /dev/null || echo &> /dev/null - fi - ${csudo}systemctl disable ${tarbitrator_service_name} &> /dev/null || echo &> /dev/null - - ${csudo}rm -f ${tarbitratord_service_config} -} - -function clean_service_on_sysvinit() { - if pidof tarbitrator &> /dev/null; then - echo "TDengine's tarbitrator is running, stopping it..." - ${csudo}service tarbitratord stop || : - fi - - if ((${initd_mod}==1)); then - if [ -e ${service_config_dir}/tarbitratord ]; then - ${csudo}chkconfig --del tarbitratord || : - fi - elif ((${initd_mod}==2)); then - if [ -e ${service_config_dir}/tarbitratord ]; then - ${csudo}insserv -r tarbitratord || : - fi - elif ((${initd_mod}==3)); then - if [ -e ${service_config_dir}/tarbitratord ]; then - ${csudo}update-rc.d -f tarbitratord remove || : - fi - fi - - ${csudo}rm -f ${service_config_dir}/tarbitratord || : - - if $(which init &> /dev/null); then - ${csudo}init q || : - fi -} - -function clean_service() { - if ((${service_mod}==0)); then - clean_service_on_systemd - elif ((${service_mod}==1)); then - clean_service_on_sysvinit - else - # must manual stop - kill_tarbitrator - fi -} - -# Stop service and disable booting start. -clean_service -# Remove binary file and links -clean_bin -# Remove header file. -##clean_header -# Remove log file -clean_log - -${csudo}rm -rf ${install_main_dir} - -echo -e "${GREEN}TDengine's arbitrator is removed successfully!${NC}" diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 09fb73cfbaf4a67d379ab1b4f6b1fa6af52960b8..4a537a3aeb6b2134f51b2b3928d4437f0ec91ed3 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1407,7 +1407,11 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU } SSyncQueryParam* pParam = pRequest->body.param; - + if (NULL == pParam) { + pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); + tsem_init(&pParam->sem, 0, 0); + } + // convert ucs4 to native multi-bytes string pResultInfo->convertUcs4 = convertUcs4; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 1cb0e2d54bd184406cacac52d106bfdf8f5f2368..5039f8bbaf7d23cc5a9e18e02cfda18419fb5ec3 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2456,10 +2456,10 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr return NULL; } + int batchs = 0; pTscObj->schemalessType = 1; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - int cnt = ceil(((double)numLines) / LINE_BATCH); Params params; params.request = request; tsem_init(¶ms.sem, 0, 0); @@ -2496,7 +2496,16 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr goto end; } - for (int i = 0; i < cnt; ++i) { + if(protocol == TSDB_SML_JSON_PROTOCOL){ + numLines = 1; + }else if(numLines <= 0){ + request->code = TSDB_CODE_SML_INVALID_DATA; + smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL); + goto end; + } + + batchs = ceil(((double)numLines) / LINE_BATCH); + for (int i = 0; i < batchs; ++i) { SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT); if(!req){ request->code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 5493628e747bb00598e32761e55018f0c3701772..aa78649ba7c276f64c0dcb15e50e848d40c125a3 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -54,7 +54,8 @@ struct tmq_conf_t { int8_t autoCommit; int8_t resetOffset; int8_t withTbName; - int8_t useSnapshot; + int8_t spEnable; + int32_t spBatchSize; uint16_t port; int32_t autoCommitInterval; char* ip; @@ -288,18 +289,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } - if (strcmp(key, "experiment.use.snapshot") == 0) { + if (strcmp(key, "experimental.snapshot.enable") == 0) { if (strcmp(value, "true") == 0) { - conf->useSnapshot = true; + conf->spEnable = true; return TMQ_CONF_OK; } else if (strcmp(value, "false") == 0) { - conf->useSnapshot = false; + conf->spEnable = false; return TMQ_CONF_OK; } else { return TMQ_CONF_INVALID; } } + if (strcmp(key, "experimental.snapshot.batch.size") == 0) { + conf->spBatchSize = atoi(value); + return TMQ_CONF_OK; + } + if (strcmp(key, "td.connect.ip") == 0) { conf->ip = strdup(value); return TMQ_CONF_OK; @@ -918,7 +924,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); pTmq->withTbName = conf->withTbName; - pTmq->useSnapshot = conf->useSnapshot; + pTmq->useSnapshot = conf->spEnable; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->commitCb = conf->commitCb; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index cef0bbbc016aaf541cc48238a2a9bbb6ecf4e492..4dc11a48158022c578ba2d5734a3f112c12a05c1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -116,6 +116,7 @@ typedef void *tsdbReaderT; #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 +int32_t tsdbSetTableId(tsdbReaderT reader, int64_t uid); int32_t tsdbSetTableList(tsdbReaderT reader, SArray *tableList); tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId, uint64_t taskId); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 3e15fd9c57735ece5ad49c3381f52c11ca9551fa..b455d779c16887c6b12bcfb7985d2778232bcb32 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -48,7 +48,6 @@ struct SSmaEnv { typedef struct { int32_t smaRef; - int32_t refId; } SSmaMgmt; #define SMA_ENV_LOCK(env) ((env)->lock) @@ -63,12 +62,12 @@ struct STSmaStat { struct SRSmaStat { SSma *pSma; - int64_t refId; - void *tmrHandle; - tmr_h tmrId; - int32_t tmrSeconds; - int8_t triggerStat; - int8_t runningStat; + int64_t refId; // shared by persistence/fetch tasks + void *tmrHandle; // for persistence task + tmr_h tmrId; // for persistence task + int32_t tmrSeconds; // for persistence task + int8_t triggerStat; // for persistence task + int8_t runningStat; // for persistence task SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 38a4f2804197205d45238faa053485f7e17f40d0..cab4805b204363d0e36b7f43beda61871906fdf1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -161,7 +161,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* // tqExec int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId); -int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, int32_t workerId); +int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); // tqMeta @@ -183,6 +183,17 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore); // tqSink void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { + pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; + pOffsetVal->uid = uid; + pOffsetVal->ts = ts; +} + +static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { + pOffsetVal->type = TMQ_OFFSET__LOG; + pOffsetVal->version = ver; +} + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index e70e79fb6fbca87617e67ce2209d2af7b6e45c4b..120d6612a25665b295190434cc00f2ac42e5ce21 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -135,7 +135,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS // init smaMgmt smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); - if (smaMgmt.refId < 0) { + if (smaMgmt.smaRef < 0) { smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM); terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4105c0164f6ee2c997224464e1c2d76f4111b456..41af641e9ec5787fe24f9723c52d2da9efcdc440 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -50,6 +50,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; + int64_t refId; void *taskInfo; // qTaskInfo_t tmr_h tmrId; int8_t level; @@ -60,11 +61,14 @@ struct SRSmaInfoItem { struct SRSmaInfo { STSchema *pTSchema; - SSma *pSma; + SRSmaStat *pStat; int64_t suid; SRSmaInfoItem items[TSDB_RETENTION_L2]; }; +#define RSMA_INFO_SMA(r) ((r)->pStat->pSma) +#define RSMA_INFO_STAT(r) ((r)->pStat) + struct SRSmaQTaskInfoItem { int32_t len; int8_t type; @@ -107,22 +111,21 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { if (pInfo) { + SSma *pSma = RSMA_INFO_SMA(pInfo); for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; if (pItem->taskInfo) { - smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pInfo->pSma), pInfo->suid, - pItem->tmrId, i + 1); + smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId, + i + 1); taosTmrStopA(&pItem->tmrId); - tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pInfo->pSma), i + 1); + tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1); } else { - smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", - SMA_VID(pInfo->pSma), pInfo->suid, i + 1); + smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), + pInfo->suid, i + 1); } } taosMemoryFree(pInfo->pTSchema); taosMemoryFree(pInfo); - } else { - smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info since empty", SMA_VID(pInfo->pSma), pInfo->suid); } return NULL; @@ -255,6 +258,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo if (param->qmsg[idx]) { SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); + pItem->refId = RSMA_REF_ID(pRSmaInfo->pStat); pItem->pRsmaInfo = pRSmaInfo; pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle); if (!pItem->taskInfo) { @@ -340,7 +344,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con goto _err; } pRSmaInfo->pTSchema = pTSchema; - pRSmaInfo->pSma = pSma; + pRSmaInfo->pStat = pStat; pRSmaInfo->suid = suid; if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) { @@ -522,7 +526,7 @@ static void tdDestroySDataBlockArray(SArray *pArray) { static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) { SArray *pResult = NULL; SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo; - SSma *pSma = pRSmaInfo->pSma; + SSma *pSma = RSMA_INFO_SMA(pRSmaInfo); while (1) { SSDataBlock *output = NULL; @@ -585,21 +589,29 @@ _err: */ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfoItem *pItem = param; - SSma *pSma = pItem->pRsmaInfo->pSma; - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv); + SSma *pSma = NULL; + SRSmaStat *pStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, pItem->refId); + if (!pStat) { + smaDebug("rsma fetch task not start since already destroyed"); + return; + } + pSma = RSMA_INFO_SMA(pItem->pRsmaInfo); + + // if rsma trigger stat in cancelled or finished, not start fetch task anymore int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat)); if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) { - smaDebug("vgId:%d, level %" PRIi8 " not fetch since stat is cancelled for table suid:%" PRIi64, SMA_VID(pSma), - pItem->level, pItem->pRsmaInfo->suid); + taosReleaseRef(smaMgmt.smaRef, pItem->refId); + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled", + SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid); return; } int8_t fetchTriggerStat = atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) { - smaDebug("vgId:%d, level %" PRIi8 " stat is active for table suid:%" PRIi64, SMA_VID(pSma), pItem->level, - pItem->pRsmaInfo->suid); + smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), + pItem->level, pItem->pRsmaInfo->suid); tdRefSmaStat(pSma, (SSmaStat *)pStat); @@ -610,9 +622,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { tdUnRefSmaStat(pSma, (SSmaStat *)pStat); } else { - smaDebug("vgId:%d, level %" PRIi8 " stat is inactive for table suid:%" PRIi64, SMA_VID(pSma), pItem->level, - pItem->pRsmaInfo->suid); + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive", + SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid); } +_end: + taosReleaseRef(smaMgmt.smaRef, pItem->refId); } static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid, @@ -632,7 +646,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_SUBMIT); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); - smaDebug("vgId:%d, process rsma insert", SMA_VID(pSma)); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); @@ -1036,7 +1049,7 @@ static void *tdRSmaPersistExec(void *param) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo; if (!taskInfo) { - smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; } @@ -1044,27 +1057,20 @@ static void *tdRSmaPersistExec(void *param) { int32_t len = 0; int8_t type = (int8_t)(i + 1); if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { - smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1, - terrstr(terrno)); + smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid, + i + 1, terrstr(terrno)); goto _err; } if (!pOutput || len <= 0) { - smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success but no output(len %d), not persist", + smaDebug("vgId:%d, rsma, table %" PRIi64 + " level %d serialize qTaskInfo success but no output(len %d), not persist", vid, pRSmaInfo->suid, i + 1, len); taosMemoryFreeClear(pOutput); continue; } - smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d, need persist", vid, + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid, pRSmaInfo->suid, i + 1, len); -#if 0 - if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) { - smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid, - i + 1, terrstr(terrno)); - } else { - smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1); - } -#endif if (!isFileCreated) { char qTaskInfoFName[TSDB_FILENAME_LEN]; @@ -1084,11 +1090,11 @@ static void *tdRSmaPersistExec(void *param) { ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN); tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset); - smaDebug("vgId:%d, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid, + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid, pRSmaInfo->suid, i + 1, headLen, toffset); tdAppendTFile(&tFile, pOutput, len, &toffset); - smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, - i + 1, len, toffset); + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, + pRSmaInfo->suid, i + 1, len, toffset); taosMemoryFree(pOutput); } @@ -1098,13 +1104,13 @@ static void *tdRSmaPersistExec(void *param) { _normal: if (isFileCreated) { if (tdUpdateTFileHeader(&tFile) < 0) { - smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), + smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), tstrerror(terrno)); tdCloseTFile(&tFile); tdRemoveTFile(&tFile); goto _err; } else { - smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile)); + smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile)); } tdCloseTFile(&tFile); @@ -1114,10 +1120,10 @@ _normal: char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]); strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) { - smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); + smaError("vgId:%d, rsma, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); goto _err; } else { - smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); + smaDebug("vgId:%d, rsma, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); } } goto _end; @@ -1129,13 +1135,13 @@ _end: if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INACTIVE, TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("vgId:%d, persist task is active again", vid); + smaDebug("vgId:%d, rsma persist task is active again", vid); } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_CANCELLED, TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, persist task is cancelled", vid); + smaDebug("vgId:%d, rsma persist task is cancelled", vid); } else { - smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); + smaWarn("vgId:%d, rsma persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); ASSERT(0); } atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); @@ -1179,9 +1185,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { */ static void tdRSmaPersistTrigger(void *param, void *tmrId) { SRSmaStat *rsmaStat = param; - int64_t refId = rsmaStat->refId; + SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, rsmaStat->refId); - SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, refId); if (!pRSmaStat) { smaDebug("rsma persistence task not start since already destroyed"); return; @@ -1221,5 +1226,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); } break; } - taosReleaseRef(smaMgmt.smaRef, refId); + taosReleaseRef(smaMgmt.smaRef, rsmaStat->refId); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a08515ed5c860d4a868ffe7ca8074310b9b0f2e2..c0d184ee114bd88fbee18f63cd42583fcdd20520 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -228,17 +228,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; } -static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { - pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; - pOffsetVal->uid = uid; - pOffsetVal->ts = ts; -} - -static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { - pOffsetVal->type = TMQ_OFFSET__LOG; - pOffsetVal->version = ver; -} - int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; @@ -394,13 +383,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { char formatBuf[50]; tFormatOffset(formatBuf, 50, &dataRsp.reqOffset); tqInfo("retrieve using snapshot req offset %s", formatBuf); - if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, workerId) < 0) { + if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { ASSERT(0); } // 4. send rsp if (dataRsp.blockNum != 0) { - tqOffsetResetToData(&dataRsp.rspOffset, 0, 0); if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } @@ -655,6 +643,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .reader = pHandle->execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, + .tqReader = true, }; pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index eb09199c7045f5396ddabbdb52705d1902a91e48..9dd2a0258fd25a5a18c9f67a2b6ca7b1076af62a 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -59,13 +59,19 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i return 0; } -int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, int32_t workerId) { +int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) { ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); qTaskInfo_t task = pExec->execCol.task[workerId]; - // TODO set uid and ts - if (qStreamScanSnapshot(task) < 0) { + + /*if (qStreamScanSnapshot(task) < 0) {*/ + /*ASSERT(0);*/ + /*}*/ + + if (qStreamPrepareScan(task, offset.uid, offset.ts) < 0) { ASSERT(0); } + + int32_t rowCnt = 0; while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -80,13 +86,27 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, i tqAddBlockDataToRsp(pDataBlock, pRsp); if (pRsp->withTbName) { - // TODO pRsp->withTbName = 0; - /*int64_t uid = 0;*/ - /*tqAddTbNameToRsp(pTq, uid, pRsp, workerId);*/ +#if 0 + int64_t uid; + int64_t ts; + if (qGetStreamScanStatus(task, &uid, &ts) < 0) { + ASSERT(0); + } + tqAddTbNameToRsp(pTq, uid, pRsp, workerId); +#endif } pRsp->blockNum++; + + rowCnt += pDataBlock->info.rows; + if (rowCnt >= 4096) break; + } + int64_t uid; + int64_t ts; + if (qGetStreamScanStatus(task, &uid, &ts) < 0) { + ASSERT(0); } + tqOffsetResetToData(&pRsp->rspOffset, uid, ts); return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 03f18cc766b96cd4318900d0ed0e7a6f4a32974a..4604e3699ff05b4236c4640202724cbe4fd49cb8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -112,9 +112,9 @@ typedef struct STsdbReadHandle { STimeWindow window; // the primary query time window that applies to all queries // SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time // SColumnDataAgg** pstatis;// the ptr array list to return to caller - int32_t numOfBlocks; + int32_t numOfBlocks; SSDataBlock* pResBlock; -// SArray* pColumns; // column list, SColumnInfoData array list + // SArray* pColumns; // column list, SColumnInfoData array list bool locateStart; int32_t outputCapacity; int32_t realNumOfRows; @@ -223,6 +223,22 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { return rows; } +static SArray* createCheckInfoFromUid(STsdbReadHandle* pTsdbReadHandle, int64_t uid) { + SArray* pTableCheckInfo = taosArrayInit(1, sizeof(STableCheckInfo)); + if (pTableCheckInfo == NULL) { + return NULL; + } + STableCheckInfo info = { + .tableId = uid, + }; + info.suid = pTsdbReadHandle->suid; + + taosArrayPush(pTableCheckInfo, &info); + tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, info.lastKey, + pTsdbReadHandle->idStr); + return pTableCheckInfo; +} + static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, SArray* pTableList) { size_t tableSize = taosArrayGetSize(pTableList); @@ -428,8 +444,8 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData colInfo = {.info = pCond->colList[i], 0}; - int32_t code = blockDataAppendColInfo(pReadHandle->pResBlock, &colInfo); - if (code != TSDB_CODE_SUCCESS){ + int32_t code = blockDataAppendColInfo(pReadHandle->pResBlock, &colInfo); + if (code != TSDB_CODE_SUCCESS) { goto _end; } } @@ -494,9 +510,19 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle return TSDB_CODE_SUCCESS; } -int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){ +int32_t tsdbSetTableId(tsdbReaderT reader, int64_t uid) { STsdbReadHandle* pTsdbReadHandle = reader; - if(pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo); + if (pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo); + pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromUid(pTsdbReadHandle, uid); + if (pTsdbReadHandle->pTableCheckInfo == NULL) { + return TSDB_CODE_TDB_OUT_OF_MEMORY; + } + return TDB_CODE_SUCCESS; +} + +int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList) { + STsdbReadHandle* pTsdbReadHandle = reader; + if (pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo); pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList); if (pTsdbReadHandle->pTableCheckInfo == NULL) { return TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -505,8 +531,8 @@ int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){ } tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId, - uint64_t taskId) { - if(taosArrayGetSize(tableList) == 0){ + uint64_t taskId) { + if (taosArrayGetSize(tableList) == 0) { return NULL; } STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); @@ -553,8 +579,7 @@ tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* t } tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle, - taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList), - pTsdbReadHandle->idStr); + taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList), pTsdbReadHandle->idStr); return (tsdbReaderT)pTsdbReadHandle; } @@ -1073,7 +1098,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio } int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerMin[precision])); // set the starting fileId - if (fid < 0LL && llabs(fid) > INT32_MAX) { // data value overflow for INT32 + if (fid < 0LL && llabs(fid) > INT32_MAX) { // data value overflow for INT32 fid = INT32_MIN; } @@ -2612,7 +2637,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files - if ((win.skey > pTsdbReadHandle->window.ekey)/* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) { + if ((win.skey > pTsdbReadHandle->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) { tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); @@ -2886,7 +2911,7 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) { */ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid); - if(!pCur) { + if (!pCur) { return TSDB_CODE_FAILED; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6b5826bf886062d7aa4bf8f4837f5f2d23e924a1..9872f26b036712758f3358bf45f249a603a3190b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -248,6 +248,11 @@ typedef struct SSampleExecInfo { uint32_t seed; // random seed value } SSampleExecInfo; +enum { + TABLE_SCAN__TABLE_ORDER = 1, + TABLE_SCAN__BLOCK_ORDER = 2, +}; + typedef struct STableScanInfo { void* dataReader; SReadHandle readHandle; @@ -272,13 +277,16 @@ typedef struct STableScanInfo { int32_t curTWinIdx; int32_t currentGroupId; + int32_t currentTable; uint64_t queryId; // todo remove it uint64_t taskId; // todo remove it struct { uint64_t uid; - int64_t t; - } scanStatus; + int64_t ts; + } lastStatus; + + int8_t scanMode; } STableScanInfo; typedef struct STagScanInfo { @@ -713,6 +721,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); +int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts); int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts); SSDataBlock* loadNextDataBlock(void* param); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 0871575d9521f14265e78b2c7f66dbb57b31f1c4..8dd8005225aa6f7191307869f0d218167b2cedf2 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -222,7 +222,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { } int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) { - SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*) tinfo; + SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo; if (pTaskInfo == NULL || pInput == NULL || len == 0) { return TSDB_CODE_INVALID_PARA; @@ -231,11 +231,20 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le return decodeOperator(pTaskInfo->pRoot, pInput, len); } +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; -int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; + if (uid == 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; + } - return TSDB_CODE_SUCCESS; + return doPrepareScan(pTaskInfo->pRoot, uid, ts); } +int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return doGetScanStatus(pTaskInfo->pRoot, uid, ts); +} diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 95782ce0016a614eab97e53fc592ad569ea3af24..dd4822984939f300a49432ced73593aafa67f7e2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1033,7 +1033,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData SqlFunctionCtx* pCtx = pTableScanInfo->pCtx; uint32_t status = BLK_DATA_NOT_LOAD; - int32_t numOfOutput = 0;//pTableScanInfo->numOfOutput; + int32_t numOfOutput = 0; // pTableScanInfo->numOfOutput; for (int32_t i = 0; i < numOfOutput; ++i) { int32_t functionId = pCtx[i].functionId; int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId; @@ -2821,13 +2821,53 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan } } +int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { + int32_t type = pOperator->operatorType; + if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamBlockScanInfo* pScanInfo = pOperator->info; + pScanInfo->blockType = STREAM_INPUT__DATA_SCAN; + + STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info; + + /*if (pSnapShotScanInfo->dataReader == NULL) {*/ + /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/ + /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/ + /*}*/ + + if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { + tsdbSetTableId(pInfo->dataReader, uid); + SQueryTableDataCond tmpCond = pInfo->cond; + tmpCond.twindows[0] = (STimeWindow){ + .skey = ts, + .ekey = INT64_MAX, + }; + tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0); + pInfo->scanTimes = 0; + pInfo->curTWinIdx = 0; + } + + } else { + if (pOperator->numOfDownstream == 1) { + return doPrepareScan(pOperator->pDownstream[0], uid, ts); + } else if (pOperator->numOfDownstream == 0) { + qError("failed to find stream scan operator to set the input data block"); + return TSDB_CODE_QRY_APP_ERROR; + } else { + qError("join not supported for stream block scan"); + return TSDB_CODE_QRY_APP_ERROR; + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamBlockScanInfo* pScanInfo = pOperator->info; - STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info; - *uid = pSnapShotScanInfo->scanStatus.uid; - *ts = pSnapShotScanInfo->scanStatus.t; + STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info; + *uid = pSnapShotScanInfo->lastStatus.uid; + *ts = pSnapShotScanInfo->lastStatus.ts; } else { if (pOperator->pDownstream[0] == NULL) { return TSDB_CODE_INVALID_PARA; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 34e5ad1bcac331caa9bf79b7fb6cebbe7c75dccd..2d6b019183c61d9309f1610120c6989303810617 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -392,6 +392,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { binfo.capacity = binfo.rows; blockDataEnsureCapacity(pBlock, binfo.rows); pBlock->info = binfo; + ASSERT(binfo.uid != 0); uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -416,9 +417,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; // todo refactor - pTableScanInfo->scanStatus.uid = pBlock->info.uid; - pTableScanInfo->scanStatus.t = pBlock->info.window.ekey; + pTableScanInfo->lastStatus.uid = pBlock->info.uid; + pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey; + ASSERT(pBlock->info.uid != 0); return pBlock; } return NULL; @@ -438,6 +440,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { + ASSERT(p->info.uid != 0); return p; } pTableScanInfo->curTWinIdx += 1; @@ -513,6 +516,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + // if scan table by table + if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { + // check status + while (1) { + SSDataBlock* result = doTableScanGroup(pOperator); + if (result) { + return result; + } + // if no data, switch to next table and continue scan + pInfo->currentTable++; + if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) { + return NULL; + } + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); + /*pTableInfo->uid */ + tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); + tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); + pInfo->scanTimes = 0; + pInfo->curTWinIdx = 0; + } + } + if (pInfo->currentGroupId == -1) { pInfo->currentGroupId++; if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { @@ -1207,6 +1232,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle) { SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; + + SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); + if (pHandle->tqReader) { + pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER; + pSTInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0); + } + if (pSTInfo->interval.interval > 0) { pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); } else { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 52cd7069e254624757bb61ef8520c6e58eae99db..03f8a228d576355c104ee1ec57410f37b336b430 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -39,6 +39,21 @@ static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName); } +void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) { + SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + pVal->literal = NULL; + pVal->isDuration = false; + pVal->translate = true; + pVal->notReserved = true; + pVal->node.resType.type = TSDB_DATA_TYPE_TINYINT; + pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TINYINT].bytes; + pVal->node.resType.precision = precision; + pVal->datum.i = (int64_t)precision; + pVal->typeData = (int64_t)precision; + + nodesListMakeAppend(pList, (SNode*)pVal); +} + // There is only one parameter of numeric type, and the return type is parameter type static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { @@ -220,8 +235,20 @@ static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + // pseudo column do not need to check parameters + + //add database precision as param + uint8_t dbPrec = pFunc->node.resType.precision; + addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + + pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters + pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; return TSDB_CODE_SUCCESS; } @@ -990,6 +1017,10 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } + //add database precision as param + uint8_t dbPrec = pFunc->node.resType.precision; + addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; return TSDB_CODE_SUCCESS; } @@ -1379,20 +1410,6 @@ static bool validateTimezoneFormat(const SValueNode* pVal) { return true; } -void static addDbPrecisonParam(SNodeList* pList, uint8_t precision) { - SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); - pVal->literal = NULL; - pVal->isDuration = false; - pVal->translate = true; - pVal->node.resType.type = TSDB_DATA_TYPE_TINYINT; - pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TINYINT].bytes; - pVal->node.resType.precision = precision; - pVal->datum.i = (int64_t)precision; - pVal->typeData = (int64_t)precision; - - nodesListAppend(pList, (SNode*)pVal); -} - void static addTimezoneParam(SNodeList* pList) { char buf[6] = {0}; time_t t = taosTime(NULL); @@ -1462,7 +1479,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int //add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - addDbPrecisonParam(pFunc->pParameterList, dbPrec); + addDbPrecisonParam(&pFunc->pParameterList, dbPrec); pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; @@ -1482,7 +1499,7 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_ //add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - addDbPrecisonParam(pFunc->pParameterList, dbPrec); + addDbPrecisonParam(&pFunc->pParameterList, dbPrec); pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; @@ -1510,7 +1527,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le //add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - addDbPrecisonParam(pFunc->pParameterList, dbPrec); + addDbPrecisonParam(&pFunc->pParameterList, dbPrec); pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; @@ -2479,7 +2496,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "now", .type = FUNCTION_TYPE_NOW, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC, - .translateFunc = translateTimePseudoColumn, + .translateFunc = translateNowToday, .getEnvFunc = NULL, .initFunc = NULL, .sprocessFunc = nowFunction, @@ -2489,7 +2506,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "today", .type = FUNCTION_TYPE_TODAY, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC, - .translateFunc = translateTimePseudoColumn, + .translateFunc = translateNowToday, .getEnvFunc = NULL, .initFunc = NULL, .sprocessFunc = todayFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e40ab3784d415bedc6cc155e22edf063833162c0..07b2a2375f227aff8c19a9c2d69381c0341426bf 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -4283,7 +4283,7 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; // TODO: process timeUnit for different db precisions - int32_t timeUnit = 1000; + int32_t timeUnit = 1; if (pCtx->numOfParams == 5) { // TODO: param number incorrect timeUnit = pCtx->param[3].param.i; } @@ -5508,7 +5508,7 @@ int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - double result = doCalcRate(pInfo, 1000); + double result = doCalcRate(pInfo, (double)TSDB_TICK_PER_SECOND(pCtx->param[1].param.i)); colDataAppend(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes); return pResInfo->numOfRes; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index b433a880015beaaa282be72f1a4063eae1a5c3c6..908149c3ea09675c8d764216972a00fc2863330a 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -20,7 +20,9 @@ #include "systable.h" #pragma GCC diagnostic push +#ifdef COMPILER_SUPPORTS_CXX13 #pragma GCC diagnostic ignored "-Wformat-truncation" +#endif int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)) = {0}; int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0}; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index dc4bf33a2b5af2d9718db970823dc8469a40ba32..76c0e4874045d503db45ee498c743149e9e2b4ee 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1515,7 +1515,10 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI); + int64_t timePrec; + GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[0]), pInput[0].columnData->pData); + + int64_t ts = taosGetTimestamp(timePrec); for (int32_t i = 0; i < pInput->numOfRows; ++i) { colDataAppendInt64(pOutput->columnData, i, &ts); } @@ -1524,7 +1527,10 @@ int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu } int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI); + int64_t timePrec; + GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[0]), pInput[0].columnData->pData); + + int64_t ts = taosGetTimestampToday(timePrec); for (int32_t i = 0; i < pInput->numOfRows; ++i) { colDataAppendInt64(pOutput->columnData, i, &ts); } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index cc114ac55f09730fc33fe1a762ab8ddca36d55f6..2a3cb88c1f02621698d0505dbd664ea1c83c3f6f 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -129,6 +129,7 @@ ./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim ./test.sh -f tsim/tmq/topic.sim ./test.sh -f tsim/tmq/snapshot.sim +./test.sh -f tsim/tmq/snapshot1.sim # --- stable ./test.sh -f tsim/stable/disk.sim diff --git a/tests/script/tsim/tmq/snapshot1.sim b/tests/script/tsim/tmq/snapshot1.sim new file mode 100644 index 0000000000000000000000000000000000000000..9226c475cb44233e0c0690c073f9b58b0f92e3e2 --- /dev/null +++ b/tests/script/tsim/tmq/snapshot1.sim @@ -0,0 +1,310 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic2Of2ConsOverlap.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics +#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics + +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# + +run tsim/tmq/prepareBasicEnv-1vgrp.sim + +#---- global parameters start ----# +$dbName = db +$vgroups = 1 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +$pullDelay = 5 +$ifcheckdata = 1 +$ifmanualcommit = 1 +$showMsg = 1 +$showRow = 0 + +sql connect +sql use $dbName + +print == create topics from super table +sql create topic topic_stb_column as select ts, c3 from stb +sql create topic topic_stb_all as select ts, c1, c2, c3 from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +print == create topics from child table +sql create topic topic_ctb_column as select ts, c3 from ctb0 +sql create topic topic_ctb_all as select * from ctb0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 + +print == create topics from normal table +sql create topic topic_ntb_column as select ts, c3 from ntb0 +sql create topic topic_ntb_all as select * from ntb0 +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 + +#sql show topics +#if $rows != 9 then +# return -1 +#endi + +#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest' +$keyList = ' . group.id:cgrp1 +$keyList = $keyList . , +$keyList = $keyList . enable.auto.commit:false +#$keyList = $keyList . , +#$keyList = $keyList . auto.commit.interval.ms:6000 +#$keyList = $keyList . , +#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . ' +print ========== key list: $keyList + +$topicNum = 2 + +#=============================== start consume =============================# + + +print ================ test consume from stb +print == overlap toipcs: topic_stb_column + topic_stb_all, topic_stb_function + topic_stb_all +$topicList = ' . topic_stb_column +$topicList = $topicList . , +$topicList = $topicList . topic_stb_all +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb +$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum +$expectmsgcnt = $totalMsgOfStb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + + +$topicList = ' . topic_stb_all +$topicList = $topicList . , +$topicList = $topicList . topic_stb_function +$topicList = $topicList . ' +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start -e 1 +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start -e 1 + +print == check consume result +wait_consumer_end_from_stb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_stb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +# $data[0][3]/$data[1][3] should be between $totalMsgOfOneTopic and $totalMsgOfStb. +if $data[0][3] < $totalMsgOfOneTopic then + return -1 +endi +if $data[0][3] > $totalMsgOfStb then + return -1 +endi +if $data[1][3] < $totalMsgOfOneTopic then + return -1 +endi +if $data[1][3] > $totalMsgOfStb then + return -1 +endi + +$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb +$sumOfRows = $data[0][3] + $data[1][3] +if $sumOfRows != $totalMsgCons then + print actual: $sumOfRows + print expect: $totalMsgCons + return -1 +endi + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb1 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for ctb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ctb +print == overlap toipcs: topic_ctb_column + topic_ctb_all, topic_ctb_function + topic_ctb_all +$topicList = ' . topic_ctb_column +$topicList = $topicList . , +$topicList = $topicList . topic_ctb_all +$topicList = $topicList . ' +$consumerId = 0 + +$totalMsgOfOneTopic = $rowsPerCtb +$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum +$expectmsgcnt = $totalMsgOfCtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + +$topicList = ' . topic_ctb_function +$topicList = $topicList . , +$topicList = $topicList . topic_ctb_all +$topicList = $topicList . ' +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + +print == start consumer to pull msgs from ctb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1 +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1 + +print == check consume result +wait_consumer_end_from_ctb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ctb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][3] == $totalMsgOfOneTopic then + if $data[1][3] == $totalMsgOfCtb then + goto check_ok_1 + endi +elif $data[1][3] == $totalMsgOfOneTopic then + if $data[0][3] == $totalMsgOfCtb then + goto check_ok_1 + endi +endi +return -1 +check_ok_1: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb2 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for ntb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ntb +print == overlap toipcs: topic_ntb_column + topic_ntb_all, topic_ntb_function + topic_ntb_all +$topicList = ' . topic_ntb_column +$topicList = $topicList . , +$topicList = $topicList . topic_ntb_all +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfOneTopic = $rowsPerCtb +$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum +$expectmsgcnt = $totalMsgOfNtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + + +$topicList = ' . topic_ntb_function +$topicList = $topicList . , +$topicList = $topicList . topic_ntb_all +$topicList = $topicList . ' +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + +print == start consumer to pull msgs from ntb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1 +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1 + +print == check consume result from ntb +wait_consumer_end_from_ntb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ntb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + + +if $data[0][3] == $totalMsgOfOneTopic then + if $data[1][3] == $totalMsgOfNtb then + goto check_ok_3 + endi +elif $data[1][3] == $totalMsgOfOneTopic then + if $data[0][3] == $totalMsgOfNtb then + goto check_ok_3 + endi +endi +return -1 +check_ok_3: + +sql select * from performance_schema.`consumers` +if $rows != 0 then + return -1 +endi + +#sql select * from performance_schema.`subscriptions` +#if $rows != 0 then +# return -1 +#endi + +#------ not need stop consumer, because it exit after pull msg overthan expect msg +#system tsim/tmq/consume.sh -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/2-query/stateduration.py b/tests/system-test/2-query/stateduration.py index fa71009ef210e6a14c5abe04fbfe0f0b95c6598a..23169553dca96990edab951bbabb457617832db3 100644 --- a/tests/system-test/2-query/stateduration.py +++ b/tests/system-test/2-query/stateduration.py @@ -38,15 +38,15 @@ class TDTestCase: tdSql.query(f"select stateduration(col{i},'{j}',5) from test") tdSql.checkRows(10) if j in ['LT' ,'lt','Lt','lT']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) elif j in ['GT','gt', 'Gt','gT']: - tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)]) + tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)]) elif j in ['LE','le','Le','lE']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)]) elif j in [ 'GE','ge','Ge','gE']: - tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,), (0,)]) + tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) elif j in ['NE','ne','Ne','nE']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (0,), (0,), (0,), (0,), (0,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) for i in float_list: @@ -54,11 +54,11 @@ class TDTestCase: tdSql.query(f"select stateduration(col{i},'{j}',5) from test") tdSql.checkRows(10) if j in ['LT','lt','Lt','lT','LE','le','Le','lE']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)]) elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']: - tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)]) + tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)]) elif j in ['NE','ne','Ne','nE']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]) elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) @@ -66,34 +66,34 @@ class TDTestCase: for i in error_column_list: for j in self.param_list: tdSql.error(f"select stateduration({i},{j},5) from test") - + error_param_list = ['a',1] for i in error_param_list: tdSql.error(f"select stateduration(col1,{i},5) from test") - + # timestamp = 1s, time_unit =1s tdSql.execute('''create table test1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') for i in range(self.row_num): tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" % (self.ts + i*1000, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) - + for i in integer_list: for j in self.param_list: tdSql.query(f"select stateduration(col{i},'{j}',5) from test1") tdSql.checkRows(10) # print(tdSql.queryResult) if j in ['LT' ,'lt','Lt','lT']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) elif j in ['GT','gt', 'Gt','gT']: - tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)]) + tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)]) elif j in ['LE','le','Le','lE']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (-1,), (-1,), (-1,), (-1,), (-1,)]) elif j in [ 'GE','ge','Ge','gE']: - tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) + tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,), (5000,)]) elif j in ['NE','ne','Ne','nE']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) - elif j in ['EQ','eq','Eq','eQ']: + tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)]) + elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) for i in float_list: for j in self.param_list: @@ -101,22 +101,22 @@ class TDTestCase: tdSql.checkRows(10) print(tdSql.queryResult) if j in ['LT','lt','Lt','lT','LE','le','Le','lE']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (-1,), (-1,), (-1,), (-1,), (-1,)]) elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']: - tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)]) + tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)]) elif j in ['NE','ne','Ne','nE']: - tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]) + tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (5000,), (6000,), (7000,), (8000,), (9000,)]) elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) # timestamp = 1m, time_unit =1m - tdSql.execute('''create table test2(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, + tdSql.execute('''create table test2(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') for i in range(self.row_num): - tdSql.execute("insert into test2 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" + tdSql.execute("insert into test2 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" % (self.ts + i*1000*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) - + for i in integer_list: for j in self.param_list: tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test2") @@ -132,7 +132,7 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) elif j in ['NE','ne','Ne','nE']: tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) - elif j in ['EQ','eq','Eq','eQ']: + elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) for i in float_list: for j in self.param_list: @@ -147,14 +147,14 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]) elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) - + # timestamp = 1h, time_unit =1h - tdSql.execute('''create table test3(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, + tdSql.execute('''create table test3(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') for i in range(self.row_num): - tdSql.execute("insert into test3 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" + tdSql.execute("insert into test3 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" % (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) - + for i in integer_list: for j in self.param_list: tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from test3") @@ -170,7 +170,7 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) elif j in ['NE','ne','Ne','nE']: tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) - elif j in ['EQ','eq','Eq','eQ']: + elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) for i in float_list: for j in self.param_list: @@ -202,7 +202,7 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (60,), (120,), (180,), (240,), (300,)]) elif j in ['NE','ne','Ne','nE']: tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (-1,), (0,), (60,), (120,), (180,), (240,)]) - elif j in ['EQ','eq','Eq','eQ']: + elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) for i in float_list: for j in self.param_list: @@ -219,13 +219,13 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) # for stb - tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, + tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(t0 int)''') tdSql.execute('create table stb_1 using stb tags(1)') for i in range(self.row_num): - tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" + tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" % (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) - + for i in integer_list: for j in self.param_list: tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from stb") @@ -241,7 +241,7 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) elif j in ['NE','ne','Ne','nE']: tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) - elif j in ['EQ','eq','Eq','eQ']: + elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) for i in float_list: for j in self.param_list: @@ -256,10 +256,10 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]) elif j in ['EQ','eq','Eq','eQ']: tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) - + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/subscribeDb4.py b/tests/system-test/7-tmq/subscribeDb4.py index de475803fe3297cc56143baaa3639a2e9e8d2553..b99704b60233e9e4fd6f96d368bb5f07d17da8ba 100644 --- a/tests/system-test/7-tmq/subscribeDb4.py +++ b/tests/system-test/7-tmq/subscribeDb4.py @@ -14,19 +14,24 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: - paraDict = {'dbName': 'db12', - 'dropFlag':1, - 'vgroups': 4, - 'precision': 'ms', - 'stbName': 'stb0', - 'ctbNum': 10, + paraDict = {'dbName': 'db12', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb0', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':16, 'count':1}, {'type': 'timestamp','count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, 'rowsPerTbl': 10000, - 'batchNum': 10, - 'startTs': 0, # 1640966400000 ----> 2022-01-01 00:00:00.000 - 'event':'', - 'columnDict': {'int':2}, - 'tagDict': {'int':1} - } + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 20, + 'showMsg': 1, + 'showRow': 1} cdbName = 'cdb' # some parameter to consumer processor @@ -57,17 +62,18 @@ class TDTestCase: tmqCom.initConsumerTable(self.cdbName) - tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"], self.paraDict['precision']) + tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"]) self.paraDict["stbName"] = 'stb1' - tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) - tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) - tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"]) + tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"]) + tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"]) self.paraDict["stbName"] = 'stb2' - tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) - tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) - tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + self.paraDict["ctbPrefix"] = 'newctb' + tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"]) + tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"]) + tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"]) tdLog.info("create topics from db") topicName1 = 'topic_%s'%(self.paraDict['dbName']) @@ -97,7 +103,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) tdLog.exit("tmq consume rows error!") - time.sleep(15) + time.sleep(10) tdSql.query("drop topic %s"%topicName1) tdLog.printNoPrefix("======== test case 12 end ...... ") diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 9254f57c40342761e144053e751b3412efaf6b0a..b545340153477ed8cab9d18e4e0be4bd14c427de 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -76,6 +76,22 @@ class TMQCom: resultList.append(tdSql.getData(i , 3)) return resultList + + def selectConsumeMsgResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 2)) + + return resultList def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0): buildPath = tdCom.getBuildPath() diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index f6dfe95f453f2e38acafa185cc74cca8cea1dac5..44a3afcf73f7290a9bdf32aa667aea5d947597cb 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -136,6 +136,7 @@ python3 ./test.py -f 7-tmq/subscribeDb0.py python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeDb2.py python3 ./test.py -f 7-tmq/subscribeDb3.py +#python3 ./test.py -f 7-tmq/subscribeDb4.py python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb1.py diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 38ee543f10be01c64402fdf0e53362e930c1d4a9..7d57ef0781a423fe3de9b7c325971bac4bd6d223 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -563,7 +563,7 @@ void build_consumer(SThreadInfo* pInfo) { // tmq_conf_set(conf, "auto.offset.reset", "latest"); // if (g_stConfInfo.useSnapshot) { - tmq_conf_set(conf, "experiment.use.snapshot", "true"); + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); } pInfo->tmq = tmq_consumer_new(conf, NULL, 0); @@ -633,6 +633,9 @@ void loop_consume(SThreadInfo* pInfo) { } } + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); while (running) { TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); @@ -645,6 +648,13 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 10 * 1000) { + taosFprintfFile(g_fp, "consumer id %d has currently poll total msgs: %" PRId64 "\n", pInfo->consumerId, + totalMsgs); + lastPrintTime = currentPrintTime; + } + if (0 == once_flag) { once_flag = 1; notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); @@ -674,8 +684,6 @@ void loop_consume(SThreadInfo* pInfo) { } void* consumeThreadFunc(void* param) { - int32_t totalMsgs = 0; - SThreadInfo* pInfo = (SThreadInfo*)param; pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); @@ -857,12 +865,27 @@ int main(int32_t argc, char* argv[]) { (void*)(&(g_stConfInfo.stThreads[i]))); } + int64_t start = taosGetTimestampUs(); + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadClear(&g_stConfInfo.stThreads[i].thread); } - // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); + int64_t end = taosGetTimestampUs(); + + int64_t totalMsgs = 0; + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt; + } + + int64_t t = end - start; + if (0 == t) t = 1; + + double tInMs = (double)t / 1000000.0; + taosFprintfFile(g_fp, + "Spent %.4f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.2f msgs/second\n\n", + tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs)); taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosCloseFile(&g_fp);