diff --git a/packaging/tools/makeclient_power.sh b/packaging/tools/makeclient_power.sh
index b4416a68bb30751d5e9b02f5e83186d750d5a935..faa5a03f52b4c9b56981a6b1c0918e543262b3bb 100755
--- a/packaging/tools/makeclient_power.sh
+++ b/packaging/tools/makeclient_power.sh
@@ -123,7 +123,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp -r ${examples_dir}/R ${install_dir}/examples
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
cp -r ${examples_dir}/go ${install_dir}/examples
- sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/src/taosapp/taosapp.go
+ sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/taosdemo.go
fi
# Copy driver
mkdir -p ${install_dir}/driver
diff --git a/packaging/tools/makepkg_power.sh b/packaging/tools/makepkg_power.sh
index 3d625900c9d912ff835092c7c5675d618b42b06d..2c02b99787c6d5ad6234de2319bf78b0b09d7e8a 100755
--- a/packaging/tools/makepkg_power.sh
+++ b/packaging/tools/makepkg_power.sh
@@ -146,7 +146,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp -r ${examples_dir}/R ${install_dir}/examples
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
cp -r ${examples_dir}/go ${install_dir}/examples
- sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/src/taosapp/taosapp.go
+ sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/taosdemo.go
fi
# Copy driver
mkdir -p ${install_dir}/driver
diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh
index 0feb64c795159b67920c8a39b53b0125dfb565bf..d91daaa5c44488e34dea7ec2ddec0863699446f2 100755
--- a/packaging/tools/post.sh
+++ b/packaging/tools/post.sh
@@ -10,6 +10,7 @@ data_dir="/var/lib/taos"
log_dir="/var/log/taos"
data_link_dir="/usr/local/taos/data"
log_link_dir="/usr/local/taos/log"
+install_main_dir="/usr/local/taos"
# static directory
cfg_dir="/usr/local/taos/cfg"
@@ -134,6 +135,29 @@ function install_config() {
else
break
fi
+ done
+
+ # user email
+ #EMAIL_PATTERN='^[A-Za-z0-9\u4e00-\u9fa5]+@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)+$'
+ #EMAIL_PATTERN='^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$'
+ #EMAIL_PATTERN="^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$"
+ echo
+ echo -e -n "${GREEN}Enter your email address for priority support or enter empty to skip${NC}: "
+ read emailAddr
+ while true; do
+ if [ ! -z "$emailAddr" ]; then
+ # check the format of the emailAddr
+ #if [[ "$emailAddr" =~ $EMAIL_PATTERN ]]; then
+ # Write the email address to temp file
+ email_file="${install_main_dir}/email"
+ ${csudo} bash -c "echo $emailAddr > ${email_file}"
+ break
+ #else
+ # read -p "Please enter the correct email address: " emailAddr
+ #fi
+ else
+ break
+ fi
done
}
diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c
index fc9c60b39b0cfa4b591cc77c1efcdac4e6647ce9..17be294531e51982924d84700fc21653ad231224 100644
--- a/src/query/src/qExtbuffer.c
+++ b/src/query/src/qExtbuffer.c
@@ -344,8 +344,6 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
return 0;
}
- assert(colIdx == 0);
-
if (tsOrder == TSDB_ORDER_DESC) { // primary column desc order
return (f1 < f2) ? 1 : -1;
} else { // asc
diff --git a/tests/gotest/batchtest.bat b/tests/gotest/batchtest.bat
old mode 100644
new mode 100755
index abe9a58f319068d5e11017abcd721a4c54d6aca9..efd8961bb0be2eb6f20e291114b92b00469b984f
--- a/tests/gotest/batchtest.bat
+++ b/tests/gotest/batchtest.bat
@@ -7,6 +7,9 @@ set serverPort=%2
if "%severIp%"=="" (set severIp=127.0.0.1)
if "%serverPort%"=="" (set serverPort=6030)
+go env -w GO111MODULE=on
+go env -w GOPROXY=https://goproxy.io,direct
+
cd case001
case001.bat %severIp% %serverPort%
diff --git a/tests/gotest/batchtest.sh b/tests/gotest/batchtest.sh
old mode 100644
new mode 100755
index e8ed9ecbed9f70c98e6b5db052c3e69082c1794d..0fbbf40714b3349651beea9302e66628b31a22ac
--- a/tests/gotest/batchtest.sh
+++ b/tests/gotest/batchtest.sh
@@ -13,6 +13,9 @@ if [ ! -n "$serverPort" ]; then
serverPort=6030
fi
+go env -w GO111MODULE=on
+go env -w GOPROXY=https://goproxy.io,direct
+
bash ./case001/case001.sh $severIp $serverPort
#bash ./case002/case002.sh $severIp $serverPort
#bash ./case003/case003.sh $severIp $serverPort
diff --git a/tests/pytest/cluster/bananceTest.py b/tests/pytest/cluster/bananceTest.py
index 5323f926ace767ddc8b2d4518c41c76f70bdb068..ef25afa7d2f7ea3b5358f8ba74d6702d28d54c85 100644
--- a/tests/pytest/cluster/bananceTest.py
+++ b/tests/pytest/cluster/bananceTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/basicTest.py b/tests/pytest/cluster/basicTest.py
index 707ccb25bbbd98de3d3190708b9cdba3fe02726b..b990d7fd982a490383939707a32635d37e546b13 100644
--- a/tests/pytest/cluster/basicTest.py
+++ b/tests/pytest/cluster/basicTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/changeReplicaTest.py b/tests/pytest/cluster/changeReplicaTest.py
index d4696d0c6dfa0c5a920b0dc090a5ccac91e4cea3..7fa68edbfee2db599076befdf9bed5f4b4be3c83 100644
--- a/tests/pytest/cluster/changeReplicaTest.py
+++ b/tests/pytest/cluster/changeReplicaTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/clusterSetup.py b/tests/pytest/cluster/clusterSetup.py
index 14fb7e3a4df45cc5c68036a350b1e5a94854d399..36af8ac42e56e1b8a7ab2237305a6bf286103552 100644
--- a/tests/pytest/cluster/clusterSetup.py
+++ b/tests/pytest/cluster/clusterSetup.py
@@ -199,26 +199,4 @@ class ClusterTest:
thread.start()
for i in range(self.numberOfThreads):
- threads[i].join()
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+ threads[i].join()
\ No newline at end of file
diff --git a/tests/pytest/cluster/dataFileRecoveryTest.py b/tests/pytest/cluster/dataFileRecoveryTest.py
index a7c9c73be77a1dd1d2a6d54733765a0f6dad2fb1..089d3fffc1499a8d9cafc87a8d94252111fcd604 100644
--- a/tests/pytest/cluster/dataFileRecoveryTest.py
+++ b/tests/pytest/cluster/dataFileRecoveryTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/fullDnodesTest.py b/tests/pytest/cluster/fullDnodesTest.py
index 3255991ab2c1bb37181b4df8e453bff0a93ff33d..3c4b10d97a24dfbb156122aa0afdbb5d22ce3941 100644
--- a/tests/pytest/cluster/fullDnodesTest.py
+++ b/tests/pytest/cluster/fullDnodesTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/killAndRestartDnodesTest.py b/tests/pytest/cluster/killAndRestartDnodesTest.py
index e62048bab526c54414eb0d4b1277c8afad5e102a..be927e862f616c7fbe490e733a18984b6971ef1f 100644
--- a/tests/pytest/cluster/killAndRestartDnodesTest.py
+++ b/tests/pytest/cluster/killAndRestartDnodesTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/offlineThresholdTest.py b/tests/pytest/cluster/offlineThresholdTest.py
index 8e500f33e526ce2bdd8729aa873012ebaedacd1a..8373424f93c8217250907e09620c8523d63071ad 100644
--- a/tests/pytest/cluster/offlineThresholdTest.py
+++ b/tests/pytest/cluster/offlineThresholdTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/oneReplicaOfflineTest.py b/tests/pytest/cluster/oneReplicaOfflineTest.py
index 219959045a541e417ccf1c2d3016166952c8a988..0223dfe01add9faca7987d7767f5c41a58b8edd2 100644
--- a/tests/pytest/cluster/oneReplicaOfflineTest.py
+++ b/tests/pytest/cluster/oneReplicaOfflineTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/queryTimeTest.py b/tests/pytest/cluster/queryTimeTest.py
index 8b5591f886050ed2afbac86fd7857f97e65323f8..74a9081ccf4fd8abc175e2e0c82b0c6feedcbb26 100644
--- a/tests/pytest/cluster/queryTimeTest.py
+++ b/tests/pytest/cluster/queryTimeTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/stopAllDnodesTest.py b/tests/pytest/cluster/stopAllDnodesTest.py
index 9f1588129ae35ca70d724546d1dca0c01414cda2..a71ae52e3d7a640bb589f3bafe16b2e4d45c7b93 100644
--- a/tests/pytest/cluster/stopAllDnodesTest.py
+++ b/tests/pytest/cluster/stopAllDnodesTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/stopTwoDnodesTest.py b/tests/pytest/cluster/stopTwoDnodesTest.py
index 109ca84a1e230b382b12bd0cb2f5b34013902e60..9e9958e2d32018b6a89a3e0d08da2c1597151ff2 100644
--- a/tests/pytest/cluster/stopTwoDnodesTest.py
+++ b/tests/pytest/cluster/stopTwoDnodesTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/cluster/syncingTest.py b/tests/pytest/cluster/syncingTest.py
index b2b8f6ec072a8a625cd101dc20fc2e14a1f3f25e..96be048d231e35f67e40fc4785d2e19337ed408b 100644
--- a/tests/pytest/cluster/syncingTest.py
+++ b/tests/pytest/cluster/syncingTest.py
@@ -12,7 +12,7 @@
# -*- coding: utf-8 -*-
import sys
-from clustertest import *
+from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
diff --git a/tests/pytest/crash_gen.sh b/tests/pytest/crash_gen.sh
index 4ffe35fc3c94edbdd194e03171696a1d681387c1..9cca23ac79e89224593bade2ed518eaa0ecd3da0 100755
--- a/tests/pytest/crash_gen.sh
+++ b/tests/pytest/crash_gen.sh
@@ -54,6 +54,7 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
# Now we are all let, and let's see if we can find a crash. Note we pass all params
+CRASH_GEN_EXEC=crash_gen_bootstrap.py
if [[ $1 == '--valgrind' ]]; then
shift
export PYTHONMALLOC=malloc
@@ -66,14 +67,14 @@ if [[ $1 == '--valgrind' ]]; then
--leak-check=yes \
--suppressions=crash_gen/valgrind_taos.supp \
$PYTHON_EXEC \
- ./crash_gen/crash_gen.py $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
+ $CRASH_GEN_EXEC $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
elif [[ $1 == '--helgrind' ]]; then
shift
valgrind \
--tool=helgrind \
$PYTHON_EXEC \
- ./crash_gen/crash_gen.py $@
+ $CRASH_GEN_EXEC $@
else
- $PYTHON_EXEC ./crash_gen/crash_gen.py $@
+ $PYTHON_EXEC $CRASH_GEN_EXEC $@
fi
diff --git a/tests/pytest/crash_gen/README.md b/tests/pytest/crash_gen/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..6788ab1a63d0a7c515558695605d1ec8ac5fb7f9
--- /dev/null
+++ b/tests/pytest/crash_gen/README.md
@@ -0,0 +1,130 @@
+
User's Guide to the Crash_Gen Tool
+
+# Introduction
+
+To effectively test and debug our TDengine product, we have developed a simple tool to
+exercise various functions of the system in a randomized fashion, hoping to expose
+maximum number of problems, hopefully without a pre-determined scenario.
+
+# Preparation
+
+To run this tool, please ensure the followed preparation work is done first.
+
+1. Fetch a copy of the TDengine source code, and build it successfully in the `build/`
+ directory
+1. Ensure that the system has Python3.8 or above properly installed. We use
+ Ubuntu 20.04LTS as our own development environment, and suggest you also use such
+ an environment if possible.
+
+# Simple Execution
+
+To run the tool with the simplest method, follow the steps below:
+
+1. Open a terminal window, start the `taosd` service in the `build/` directory
+ (or however you prefer to start the `taosd` service)
+1. Open another terminal window, go into the `tests/pytest/` directory, and
+ run `./crash_gen.sh -p -t 3 -s 10` (change the two parameters here as you wish)
+1. Watch the output to the end and see if you get a `SUCCESS` or `FAILURE`
+
+That's it!
+
+# Running Clusters
+
+This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
+can start a cluster quite easily with the following command:
+
+```
+$ cd tests/pytest/
+$ ./crash_gen.sh -e -o 3
+```
+
+The `-e` option above tells the tool to start the service, and do not run any tests, while
+the `-o 3` option tells the tool to start 3 DNodes and join them together in a cluster.
+Obviously you can adjust the the number here.
+
+## Behind the Scenes
+
+When the tool runs a cluster, it users a number of directories, each holding the information
+for a single DNode, see:
+
+```
+$ ls build/cluster*
+build/cluster_dnode_0:
+cfg data log
+
+build/cluster_dnode_1:
+cfg data log
+
+build/cluster_dnode_2:
+cfg data log
+```
+
+Therefore, when something goes wrong and you want to reset everything with the cluster, simple
+erase all the files:
+
+```
+$ rm -rf build/cluster_dnode_*
+```
+
+## Addresses and Ports
+
+The DNodes in the cluster all binds the the `127.0.0.1` IP address (for now anyway), and
+uses port 6030 for the first DNode, and 6130 for the 2nd one, and so on.
+
+## Testing Against a Cluster
+
+In a separate terminal window, you can invoke the tool in client mode and test against
+a cluster, such as:
+
+```
+$ ./crash_gen.sh -p -t 10 -s 100 -i 3
+```
+
+Here the `-i` option tells the tool to always create tables with 3 replicas, and run
+all tests against such tables.
+
+# Additional Features
+
+The exhaustive features of the tool is available through the `-h` option:
+
+```
+$ ./crash_gen.sh -h
+usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS] [-i MAX_REPLICAS] [-l] [-n] [-o NUM_DNODES] [-p] [-r]
+ [-s MAX_STEPS] [-t NUM_THREADS] [-v] [-x]
+
+TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
+---------------------------------------------------------------------
+1. You build TDengine in the top level ./build directory, as described in offical docs
+2. You run the server there before this script: ./build/bin/taosd -c test/cfg
+
+optional arguments:
+ -h, --help show this help message and exit
+ -a, --auto-start-service
+ Automatically start/stop the TDengine service (default: false)
+ -b MAX_DBS, --max-dbs MAX_DBS
+ Maximum number of DBs to keep, set to disable dropping DB. (default: 0)
+ -c CONNECTOR_TYPE, --connector-type CONNECTOR_TYPE
+ Connector type to use: native, rest, or mixed (default: 10)
+ -d, --debug Turn on DEBUG mode for more logging (default: false)
+ -e, --run-tdengine Run TDengine service in foreground (default: false)
+ -g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
+ Ignore error codes, comma separated, 0x supported (default: None)
+ -i MAX_REPLICAS, --max-replicas MAX_REPLICAS
+ Maximum number of replicas to use, when testing against clusters. (default: 1)
+ -l, --larger-data Write larger amount of data during write operations (default: false)
+ -n, --dynamic-db-table-names
+ Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)
+ -o NUM_DNODES, --num-dnodes NUM_DNODES
+ Number of Dnodes to initialize, used with -e option. (default: 1)
+ -p, --per-thread-db-connection
+ Use a single shared db connection (default: false)
+ -r, --record-ops Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)
+ -s MAX_STEPS, --max-steps MAX_STEPS
+ Maximum number of steps to run (default: 100)
+ -t NUM_THREADS, --num-threads NUM_THREADS
+ Number of threads to run (default: 10)
+ -v, --verify-data Verify data written in a number of places by reading back (default: false)
+ -x, --continue-on-exception
+ Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
+```
+
diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen.py
index 48196ab383c974b5c5d3f5ebc54773cd846353e6..102d7d9bdd6a893014342f46552ae1b34cafc84c 100755
--- a/tests/pytest/crash_gen/crash_gen.py
+++ b/tests/pytest/crash_gen/crash_gen.py
@@ -14,42 +14,36 @@
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
-import taos
-from util.sql import *
-from util.cases import *
-from util.dnodes import *
-from util.log import *
-from queue import Queue, Empty
-from typing import IO
+
from typing import Set
from typing import Dict
from typing import List
-from requests.auth import HTTPBasicAuth
+from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
+
import textwrap
-import datetime
-import logging
import time
+import datetime
import random
+import logging
import threading
-import requests
import copy
import argparse
import getopt
import sys
import os
-import io
import signal
import traceback
import resource
from guppy import hpy
import gc
-try:
- import psutil
-except:
- print("Psutil module needed, please install: sudo pip3 install psutil")
- sys.exit(-1)
+from .service_manager import ServiceManager, TdeInstance
+from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
+from .db import DbConn, MyTDSql, DbConnNative, DbManager
+
+import taos
+import requests
# Require Python 3
if sys.version_info[0] < 3:
@@ -59,41 +53,37 @@ if sys.version_info[0] < 3:
# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
-gConfig = argparse.Namespace() # Dummy value, will be replaced later
-gSvcMgr = None # TODO: refactor this hack, use dep injection
-logger = None # type: Logger
-
-def runThread(wt: WorkerThread):
- wt.run()
-
-class CrashGenError(Exception):
- def __init__(self, msg=None, errno=None):
- self.msg = msg
- self.errno = errno
+gConfig: argparse.Namespace
+gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection
+# logger: logging.Logger
+gContainer: Container
- def __str__(self):
- return self.msg
+# def runThread(wt: WorkerThread):
+# wt.run()
class WorkerThread:
- def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
- # te: TaskExecutor,
- ): # note: main thread context!
+ def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator):
+ """
+ Note: this runs in the main thread context
+ """
# self._curStep = -1
self._pool = pool
self._tid = tid
self._tc = tc # type: ThreadCoordinator
# self.threadIdent = threading.get_ident()
- self._thread = threading.Thread(target=runThread, args=(self,))
+ # self._thread = threading.Thread(target=runThread, args=(self,))
+ self._thread = threading.Thread(target=self.run)
self._stepGate = threading.Event()
# Let us have a DB connection of our own
if (gConfig.per_thread_db_connection): # type: ignore
# print("connector_type = {}".format(gConfig.connector_type))
- if gConfig.connector_type == 'native':
- self._dbConn = DbConn.createNative()
+ tInst = gContainer.defTdeInstance
+ if gConfig.connector_type == 'native':
+ self._dbConn = DbConn.createNative(tInst.getDbTarget())
elif gConfig.connector_type == 'rest':
- self._dbConn = DbConn.createRest()
+ self._dbConn = DbConn.createRest(tInst.getDbTarget())
elif gConfig.connector_type == 'mixed':
if Dice.throw(2) == 0: # 1/2 chance
self._dbConn = DbConn.createNative()
@@ -105,10 +95,10 @@ class WorkerThread:
# self._dbInUse = False # if "use db" was executed already
def logDebug(self, msg):
- logger.debug(" TRD[{}] {}".format(self._tid, msg))
+ Logging.debug(" TRD[{}] {}".format(self._tid, msg))
def logInfo(self, msg):
- logger.info(" TRD[{}] {}".format(self._tid, msg))
+ Logging.info(" TRD[{}] {}".format(self._tid, msg))
# def dbInUse(self):
# return self._dbInUse
@@ -127,10 +117,10 @@ class WorkerThread:
def run(self):
# initialization after thread starts, in the thread context
# self.isSleeping = False
- logger.info("Starting to run thread: {}".format(self._tid))
+ Logging.info("Starting to run thread: {}".format(self._tid))
if (gConfig.per_thread_db_connection): # type: ignore
- logger.debug("Worker thread openning database connection")
+ Logging.debug("Worker thread openning database connection")
self._dbConn.open()
self._doTaskLoop()
@@ -140,7 +130,7 @@ class WorkerThread:
if self._dbConn.isOpen: #sometimes it is not open
self._dbConn.close()
else:
- logger.warning("Cleaning up worker thread, dbConn already closed")
+ Logging.warning("Cleaning up worker thread, dbConn already closed")
def _doTaskLoop(self):
# while self._curStep < self._pool.maxSteps:
@@ -151,15 +141,15 @@ class WorkerThread:
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
except threading.BrokenBarrierError as err: # main thread timed out
print("_bto", end="")
- logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
+ Logging.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
break
- logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
+ Logging.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
self.crossStepGate() # then per-thread gate, after being tapped
- logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
+ Logging.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
if not self._tc.isRunning():
print("_wts", end="")
- logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
+ Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
@@ -178,15 +168,15 @@ class WorkerThread:
raise
# Fetch a task from the Thread Coordinator
- logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
+ Logging.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask()
# Execute such a task
- logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
+ Logging.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
self._tid, task.__class__.__name__))
task.execute(self)
tc.saveExecutedTask(task)
- logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
+ Logging.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
# self._dbInUse = False # there may be changes between steps
# print("_wtd", end=None) # worker thread died
@@ -209,7 +199,7 @@ class WorkerThread:
self.verifyThreadSelf() # only allowed by ourselves
# Wait again at the "gate", waiting to be "tapped"
- logger.debug(
+ Logging.debug(
"[TRD] Worker thread {} about to cross the step gate".format(
self._tid))
self._stepGate.wait()
@@ -222,7 +212,7 @@ class WorkerThread:
self.verifyThreadMain() # only allowed for main thread
if self._thread.is_alive():
- logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
+ Logging.debug("[TRD] Tapping worker thread {}".format(self._tid))
self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit
else:
@@ -253,7 +243,7 @@ class WorkerThread:
class ThreadCoordinator:
- WORKER_THREAD_TIMEOUT = 60 # one minute
+ WORKER_THREAD_TIMEOUT = 180 # one minute
def __init__(self, pool: ThreadPool, dbManager: DbManager):
self._curStep = -1 # first step is 0
@@ -267,7 +257,7 @@ class ThreadCoordinator:
self._stepBarrier = threading.Barrier(
self._pool.numThreads + 1) # one barrier for all threads
self._execStats = ExecutionStats()
- self._runStatus = MainExec.STATUS_RUNNING
+ self._runStatus = Status.STATUS_RUNNING
self._initDbs()
def getTaskExecutor(self):
@@ -280,14 +270,14 @@ class ThreadCoordinator:
self._stepBarrier.wait(timeout)
def requestToStop(self):
- self._runStatus = MainExec.STATUS_STOPPING
+ self._runStatus = Status.STATUS_STOPPING
self._execStats.registerFailure("User Interruption")
def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
maxSteps = gConfig.max_steps # type: ignore
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
return True
- if self._runStatus != MainExec.STATUS_RUNNING:
+ if self._runStatus != Status.STATUS_RUNNING:
return True
if transitionFailed:
return True
@@ -308,7 +298,7 @@ class ThreadCoordinator:
def _releaseAllWorkerThreads(self, transitionFailed):
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
# Now not all threads had time to go to sleep
- logger.debug(
+ Logging.debug(
"--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
# A new TE for the new step
@@ -316,7 +306,7 @@ class ThreadCoordinator:
if not transitionFailed: # only if not failed
self._te = TaskExecutor(self._curStep)
- logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
+ Logging.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
self._curStep)) # Now not all threads had time to go to sleep
# Worker threads will wake up at this point, and each execute it's own task
self.tapAllThreads() # release all worker thread from their "gates"
@@ -325,10 +315,10 @@ class ThreadCoordinator:
# Now main thread (that's us) is ready to enter a step
# let other threads go past the pool barrier, but wait at the
# thread gate
- logger.debug("[TRD] Main thread about to cross the barrier")
+ Logging.debug("[TRD] Main thread about to cross the barrier")
self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
- logger.debug("[TRD] Main thread finished crossing the barrier")
+ Logging.debug("[TRD] Main thread finished crossing the barrier")
def _doTransition(self):
transitionFailed = False
@@ -336,11 +326,11 @@ class ThreadCoordinator:
for x in self._dbs:
db = x # type: Database
sm = db.getStateMachine()
- logger.debug("[STT] starting transitions for DB: {}".format(db.getName()))
+ Logging.debug("[STT] starting transitions for DB: {}".format(db.getName()))
# at end of step, transiton the DB state
tasksForDb = db.filterTasks(self._executedTasks)
sm.transition(tasksForDb, self.getDbManager().getDbConn())
- logger.debug("[STT] transition ended for DB: {}".format(db.getName()))
+ Logging.debug("[STT] transition ended for DB: {}".format(db.getName()))
# Due to limitation (or maybe not) of the TD Python library,
# we cannot share connections across threads
@@ -348,14 +338,14 @@ class ThreadCoordinator:
# Moving below to task loop
# if sm.hasDatabase():
# for t in self._pool.threadList:
- # logger.debug("[DB] use db for all worker threads")
+ # Logging.debug("[DB] use db for all worker threads")
# t.useDb()
# t.execSql("use db") # main thread executing "use
# db" on behalf of every worker thread
except taos.error.ProgrammingError as err:
if (err.msg == 'network unavailable'): # broken DB connection
- logger.info("DB connection broken, execution failed")
+ Logging.info("DB connection broken, execution failed")
traceback.print_stack()
transitionFailed = True
self._te = None # Not running any more
@@ -368,7 +358,7 @@ class ThreadCoordinator:
self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step
- logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
+ Logging.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
return transitionFailed
def run(self):
@@ -382,8 +372,9 @@ class ThreadCoordinator:
hasAbortedTask = False
workerTimeout = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
- if not gConfig.debug: # print this only if we are not in debug mode
- print(".", end="", flush=True)
+ if not gConfig.debug: # print this only if we are not in debug mode
+ Progress.emit(Progress.STEP_BOUNDARY)
+ # print(".", end="", flush=True)
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps
# memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
@@ -395,8 +386,9 @@ class ThreadCoordinator:
try:
self._syncAtBarrier() # For now just cross the barrier
+ Progress.emit(Progress.END_THREAD_STEP)
except threading.BrokenBarrierError as err:
- logger.info("Main loop aborted, caused by worker thread time-out")
+ Logging.info("Main loop aborted, caused by worker thread time-out")
self._execStats.registerFailure("Aborted due to worker thread timeout")
print("\n\nWorker Thread time-out detected, important thread info:")
ts = ThreadStacks()
@@ -409,7 +401,7 @@ class ThreadCoordinator:
# threads are QUIET.
hasAbortedTask = self._hasAbortedTask() # from previous step
if hasAbortedTask:
- logger.info("Aborted task encountered, exiting test program")
+ Logging.info("Aborted task encountered, exiting test program")
self._execStats.registerFailure("Aborted Task Encountered")
break # do transition only if tasks are error free
@@ -420,29 +412,30 @@ class ThreadCoordinator:
transitionFailed = True
errno2 = Helper.convertErrno(err.errno) # correct error scheme
errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
- logger.info(errMsg)
+ Logging.info(errMsg)
traceback.print_exc()
self._execStats.registerFailure(errMsg)
# Then we move on to the next step
+ Progress.emit(Progress.BEGIN_THREAD_STEP)
self._releaseAllWorkerThreads(transitionFailed)
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
- logger.debug("Abnormal ending of main thraed")
+ Logging.debug("Abnormal ending of main thraed")
elif workerTimeout:
- logger.debug("Abnormal ending of main thread, due to worker timeout")
+ Logging.debug("Abnormal ending of main thread, due to worker timeout")
else: # regular ending, workers waiting at "barrier"
- logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
+ Logging.debug("Regular ending, main thread waiting for all worker threads to stop...")
self._syncAtBarrier()
self._te = None # No more executor, time to end
- logger.debug("Main thread tapping all threads one last time...")
+ Logging.debug("Main thread tapping all threads one last time...")
self.tapAllThreads() # Let the threads run one last time
- logger.debug("\r\n\n--> Main thread ready to finish up...")
- logger.debug("Main thread joining all threads")
+ Logging.debug("\r\n\n--> Main thread ready to finish up...")
+ Logging.debug("Main thread joining all threads")
self._pool.joinAll() # Get all threads to finish
- logger.info("\nAll worker threads finished")
+ Logging.info("\nAll worker threads finished")
self._execStats.endExec()
def cleanup(self): # free resources
@@ -474,7 +467,7 @@ class ThreadCoordinator:
wakeSeq.append(i)
else:
wakeSeq.insert(0, i)
- logger.debug(
+ Logging.debug(
"[TRD] Main thread waking up worker threads: {}".format(
str(wakeSeq)))
# TODO: set dice seed to a deterministic value
@@ -492,9 +485,11 @@ class ThreadCoordinator:
dbc = self.getDbManager().getDbConn()
if gConfig.max_dbs == 0:
self._dbs.append(Database(0, dbc))
- else:
+ else:
+ baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
+ )*333) % 888 if gConfig.dynamic_db_table_names else 0
for i in range(gConfig.max_dbs):
- self._dbs.append(Database(i, dbc))
+ self._dbs.append(Database(baseDbNumber + i, dbc))
def pickDatabase(self):
idxDb = 0
@@ -512,7 +507,7 @@ class ThreadCoordinator:
# pick a task type for current state
db = self.pickDatabase()
- taskType = db.getStateMachine().pickTaskType() # type: Task
+ taskType = db.getStateMachine().pickTaskType() # dynamic name of class
return taskType(self._execStats, db) # create a task from it
def resetExecutedTasks(self):
@@ -522,13 +517,6 @@ class ThreadCoordinator:
with self._lock:
self._executedTasks.append(task)
-# We define a class to run a number of threads in locking steps.
-
-class Helper:
- @classmethod
- def convertErrno(cls, errno):
- return errno if (errno > 0) else 0x80000000 + errno
-
class ThreadPool:
def __init__(self, numThreads, maxSteps):
self.numThreads = numThreads
@@ -546,7 +534,7 @@ class ThreadPool:
def joinAll(self):
for workerThread in self.threadList:
- logger.debug("Joining thread...")
+ Logging.debug("Joining thread...")
workerThread._thread.join()
def cleanup(self):
@@ -603,7 +591,7 @@ class LinearQueue():
def allocate(self, i):
with self._lock:
- # logger.debug("LQ allocating item {}".format(i))
+ # Logging.debug("LQ allocating item {}".format(i))
if (i in self.inUse):
raise RuntimeError(
"Cannot re-use same index in queue: {}".format(i))
@@ -611,7 +599,7 @@ class LinearQueue():
def release(self, i):
with self._lock:
- # logger.debug("LQ releasing item {}".format(i))
+ # Logging.debug("LQ releasing item {}".format(i))
self.inUse.remove(i) # KeyError possible, TODO: why?
def size(self):
@@ -633,357 +621,6 @@ class LinearQueue():
return ret
-class DbConn:
- TYPE_NATIVE = "native-c"
- TYPE_REST = "rest-api"
- TYPE_INVALID = "invalid"
-
- @classmethod
- def create(cls, connType):
- if connType == cls.TYPE_NATIVE:
- return DbConnNative()
- elif connType == cls.TYPE_REST:
- return DbConnRest()
- else:
- raise RuntimeError(
- "Unexpected connection type: {}".format(connType))
-
- @classmethod
- def createNative(cls):
- return cls.create(cls.TYPE_NATIVE)
-
- @classmethod
- def createRest(cls):
- return cls.create(cls.TYPE_REST)
-
- def __init__(self):
- self.isOpen = False
- self._type = self.TYPE_INVALID
- self._lastSql = None
-
- def getLastSql(self):
- return self._lastSql
-
- def open(self):
- if (self.isOpen):
- raise RuntimeError("Cannot re-open an existing DB connection")
-
- # below implemented by child classes
- self.openByType()
-
- logger.debug("[DB] data connection opened, type = {}".format(self._type))
- self.isOpen = True
-
- def queryScalar(self, sql) -> int:
- return self._queryAny(sql)
-
- def queryString(self, sql) -> str:
- return self._queryAny(sql)
-
- def _queryAny(self, sql): # actual query result as an int
- if (not self.isOpen):
- raise RuntimeError("Cannot query database until connection is open")
- nRows = self.query(sql)
- if nRows != 1:
- raise taos.error.ProgrammingError(
- "Unexpected result for query: {}, rows = {}".format(sql, nRows),
- (0x991 if nRows==0 else 0x992)
- )
- if self.getResultRows() != 1 or self.getResultCols() != 1:
- raise RuntimeError("Unexpected result set for query: {}".format(sql))
- return self.getQueryResult()[0][0]
-
- def use(self, dbName):
- self.execute("use {}".format(dbName))
-
- def existsDatabase(self, dbName: str):
- ''' Check if a certain database exists '''
- self.query("show databases")
- dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
- # ret2 = dbName in dbs
- # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
- return dbName in dbs # TODO: super weird type mangling seen, once here
-
- def hasTables(self):
- return self.query("show tables") > 0
-
- def execute(self, sql):
- ''' Return the number of rows affected'''
- raise RuntimeError("Unexpected execution, should be overriden")
-
- def safeExecute(self, sql):
- '''Safely execute any SQL query, returning True/False upon success/failure'''
- try:
- self.execute(sql)
- return True # ignore num of results, return success
- except taos.error.ProgrammingError as err:
- return False # failed, for whatever TAOS reason
- # Not possile to reach here, non-TAOS exception would have been thrown
-
- def query(self, sql) -> int: # return num rows returned
- ''' Return the number of rows affected'''
- raise RuntimeError("Unexpected execution, should be overriden")
-
- def openByType(self):
- raise RuntimeError("Unexpected execution, should be overriden")
-
- def getQueryResult(self):
- raise RuntimeError("Unexpected execution, should be overriden")
-
- def getResultRows(self):
- raise RuntimeError("Unexpected execution, should be overriden")
-
- def getResultCols(self):
- raise RuntimeError("Unexpected execution, should be overriden")
-
-# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
-
-
-class DbConnRest(DbConn):
- def __init__(self):
- super().__init__()
- self._type = self.TYPE_REST
- self._url = "http://localhost:6041/rest/sql" # fixed for now
- self._result = None
-
- def openByType(self): # Open connection
- pass # do nothing, always open
-
- def close(self):
- if (not self.isOpen):
- raise RuntimeError("Cannot clean up database until connection is open")
- # Do nothing for REST
- logger.debug("[DB] REST Database connection closed")
- self.isOpen = False
-
- def _doSql(self, sql):
- self._lastSql = sql # remember this, last SQL attempted
- try:
- r = requests.post(self._url,
- data = sql,
- auth = HTTPBasicAuth('root', 'taosdata'))
- except:
- print("REST API Failure (TODO: more info here)")
- raise
- rj = r.json()
- # Sanity check for the "Json Result"
- if ('status' not in rj):
- raise RuntimeError("No status in REST response")
-
- if rj['status'] == 'error': # clearly reported error
- if ('code' not in rj): # error without code
- raise RuntimeError("REST error return without code")
- errno = rj['code'] # May need to massage this in the future
- # print("Raising programming error with REST return: {}".format(rj))
- raise taos.error.ProgrammingError(
- rj['desc'], errno) # todo: check existance of 'desc'
-
- if rj['status'] != 'succ': # better be this
- raise RuntimeError(
- "Unexpected REST return status: {}".format(
- rj['status']))
-
- nRows = rj['rows'] if ('rows' in rj) else 0
- self._result = rj
- return nRows
-
- def execute(self, sql):
- if (not self.isOpen):
- raise RuntimeError(
- "Cannot execute database commands until connection is open")
- logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
- nRows = self._doSql(sql)
- logger.debug(
- "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
- return nRows
-
- def query(self, sql): # return rows affected
- return self.execute(sql)
-
- def getQueryResult(self):
- return self._result['data']
-
- def getResultRows(self):
- print(self._result)
- raise RuntimeError("TBD")
- # return self._tdSql.queryRows
-
- def getResultCols(self):
- print(self._result)
- raise RuntimeError("TBD")
-
- # Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
-
-
-class MyTDSql:
- # Class variables
- _clsLock = threading.Lock() # class wide locking
- longestQuery = None # type: str
- longestQueryTime = 0.0 # seconds
- lqStartTime = 0.0
- # lqEndTime = 0.0 # Not needed, as we have the two above already
-
- def __init__(self, hostAddr, cfgPath):
- # Make the DB connection
- self._conn = taos.connect(host=hostAddr, config=cfgPath)
- self._cursor = self._conn.cursor()
-
- self.queryRows = 0
- self.queryCols = 0
- self.affectedRows = 0
-
- # def init(self, cursor, log=True):
- # self.cursor = cursor
- # if (log):
- # caller = inspect.getframeinfo(inspect.stack()[1][0])
- # self.cursor.log(caller.filename + ".sql")
-
- def close(self):
- self._cursor.close() # can we double close?
- self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
- self._cursor.close()
-
- def _execInternal(self, sql):
- startTime = time.time()
- ret = self._cursor.execute(sql)
- # print("\nSQL success: {}".format(sql))
- queryTime = time.time() - startTime
- # Record the query time
- cls = self.__class__
- if queryTime > (cls.longestQueryTime + 0.01) :
- with cls._clsLock:
- cls.longestQuery = sql
- cls.longestQueryTime = queryTime
- cls.lqStartTime = startTime
- return ret
-
- def query(self, sql):
- self.sql = sql
- try:
- self._execInternal(sql)
- self.queryResult = self._cursor.fetchall()
- self.queryRows = len(self.queryResult)
- self.queryCols = len(self._cursor.description)
- except Exception as e:
- # caller = inspect.getframeinfo(inspect.stack()[1][0])
- # args = (caller.filename, caller.lineno, sql, repr(e))
- # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
- raise
- return self.queryRows
-
- def execute(self, sql):
- self.sql = sql
- try:
- self.affectedRows = self._execInternal(sql)
- except Exception as e:
- # caller = inspect.getframeinfo(inspect.stack()[1][0])
- # args = (caller.filename, caller.lineno, sql, repr(e))
- # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
- raise
- return self.affectedRows
-
-
-class DbConnNative(DbConn):
- # Class variables
- _lock = threading.Lock()
- _connInfoDisplayed = False
- totalConnections = 0 # Not private
-
- def __init__(self):
- super().__init__()
- self._type = self.TYPE_NATIVE
- self._conn = None
- # self._cursor = None
-
- def getBuildPath(self):
- selfPath = os.path.dirname(os.path.realpath(__file__))
- if ("community" in selfPath):
- projPath = selfPath[:selfPath.find("communit")]
- else:
- projPath = selfPath[:selfPath.find("tests")]
-
- buildPath = None
- for root, dirs, files in os.walk(projPath):
- if ("taosd" in files):
- rootRealPath = os.path.dirname(os.path.realpath(root))
- if ("packaging" not in rootRealPath):
- buildPath = root[:len(root) - len("/build/bin")]
- break
- if buildPath == None:
- raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
- .format(selfPath, projPath))
- return buildPath
-
-
- def openByType(self): # Open connection
- cfgPath = self.getBuildPath() + "/test/cfg"
- hostAddr = "127.0.0.1"
-
- cls = self.__class__ # Get the class, to access class variables
- with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
- if not cls._connInfoDisplayed:
- cls._connInfoDisplayed = True # updating CLASS variable
- logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
- # Make the connection
- # self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
- # self._cursor = self._conn.cursor()
- # Record the count in the class
- self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection
- cls.totalConnections += 1
-
- self._tdSql.execute('reset query cache')
- # self._cursor.execute('use db') # do this at the beginning of every
-
- # Open connection
- # self._tdSql = MyTDSql()
- # self._tdSql.init(self._cursor)
-
- def close(self):
- if (not self.isOpen):
- raise RuntimeError("Cannot clean up database until connection is open")
- self._tdSql.close()
- # Decrement the class wide counter
- cls = self.__class__ # Get the class, to access class variables
- with cls._lock:
- cls.totalConnections -= 1
-
- logger.debug("[DB] Database connection closed")
- self.isOpen = False
-
- def execute(self, sql):
- if (not self.isOpen):
- raise RuntimeError("Cannot execute database commands until connection is open")
- logger.debug("[SQL] Executing SQL: {}".format(sql))
- self._lastSql = sql
- nRows = self._tdSql.execute(sql)
- logger.debug(
- "[SQL] Execution Result, nRows = {}, SQL = {}".format(
- nRows, sql))
- return nRows
-
- def query(self, sql): # return rows affected
- if (not self.isOpen):
- raise RuntimeError(
- "Cannot query database until connection is open")
- logger.debug("[SQL] Executing SQL: {}".format(sql))
- self._lastSql = sql
- nRows = self._tdSql.query(sql)
- logger.debug(
- "[SQL] Query Result, nRows = {}, SQL = {}".format(
- nRows, sql))
- return nRows
- # results are in: return self._tdSql.queryResult
-
- def getQueryResult(self):
- return self._tdSql.queryResult
-
- def getResultRows(self):
- return self._tdSql.queryRows
-
- def getResultCols(self):
- return self._tdSql.queryCols
-
-
class AnyState:
STATE_INVALID = -1
STATE_EMPTY = 0 # nothing there, no even a DB
@@ -1232,7 +869,7 @@ class StateMechine:
def init(self, dbc: DbConn): # late initailization, don't save the dbConn
self._curState = self._findCurrentState(dbc) # starting state
- logger.debug("Found Starting State: {}".format(self._curState))
+ Logging.debug("Found Starting State: {}".format(self._curState))
# TODO: seems no lnoger used, remove?
def getCurrentState(self):
@@ -1270,7 +907,7 @@ class StateMechine:
raise RuntimeError(
"No suitable task types found for state: {}".format(
self._curState))
- logger.debug(
+ Logging.debug(
"[OPS] Tasks found for state {}: {}".format(
self._curState,
typesToStrings(taskTypes)))
@@ -1280,27 +917,27 @@ class StateMechine:
ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
dbName =self._db.getName()
if not dbc.existsDatabase(dbName): # dbc.hasDatabases(): # no database?!
- logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
+ Logging.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
return StateEmpty()
# did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own
dbc.use(dbName)
if not dbc.hasTables(): # no tables
- logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
+ Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly()
sTable = self._db.getFixedSuperTable()
if sTable.hasRegTables(dbc, dbName): # no regular tables
- logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
+ Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateSuperTableOnly()
else: # has actual tables
- logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
+ Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
return StateHasData()
# We transition the system to a new state by examining the current state itself
def transition(self, tasks, dbc: DbConn):
if (len(tasks) == 0): # before 1st step, or otherwise empty
- logger.debug("[STT] Starting State: {}".format(self._curState))
+ Logging.debug("[STT] Starting State: {}".format(self._curState))
return # do nothing
# this should show up in the server log, separating steps
@@ -1336,7 +973,7 @@ class StateMechine:
# Nothing for sure
newState = self._findCurrentState(dbc)
- logger.debug("[STT] New DB state determined: {}".format(newState))
+ Logging.debug("[STT] New DB state determined: {}".format(newState))
# can old state move to new state through the tasks?
self._curState.verifyTasksToState(tasks, newState)
self._curState = newState
@@ -1354,7 +991,7 @@ class StateMechine:
# read data task, default to 10: TODO: change to a constant
weights.append(10)
i = self._weighted_choice_sub(weights)
- # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
+ # Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
return taskTypes[i]
# ref:
@@ -1372,6 +1009,8 @@ class Database:
possibly in a cluster environment.
For now we use it to manage state transitions in that database
+
+ TODO: consider moving, but keep in mind it contains "StateMachine"
'''
_clsLock = threading.Lock() # class wide lock
_lastInt = 101 # next one is initial integer
@@ -1433,7 +1072,7 @@ class Database:
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above
- logger.info("Setting up TICKS to start from: {}".format(t4))
+ Logging.info("Setting up TICKS to start from: {}".format(t4))
return t4
@classmethod
@@ -1468,64 +1107,6 @@ class Database:
return ret
-class DbManager():
- ''' This is a wrapper around DbConn(), to make it easier to use.
-
- TODO: rename this to DbConnManager
- '''
- def __init__(self):
- self.tableNumQueue = LinearQueue() # TODO: delete?
- # self.openDbServerConnection()
- self._dbConn = DbConn.createNative() if (
- gConfig.connector_type == 'native') else DbConn.createRest()
- try:
- self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
- except taos.error.ProgrammingError as err:
- # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
- if (err.msg == 'client disconnected'): # cannot open DB connection
- print(
- "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
- sys.exit(2)
- else:
- print("Failed to connect to DB, errno = {}, msg: {}"
- .format(Helper.convertErrno(err.errno), err.msg))
- raise
- except BaseException:
- print("[=] Unexpected exception")
- raise
-
- # Do this after dbConn is in proper shape
- # Moved to Database()
- # self._stateMachine = StateMechine(self._dbConn)
-
- def getDbConn(self):
- return self._dbConn
-
- # TODO: not used any more, to delete
- def pickAndAllocateTable(self): # pick any table, and "use" it
- return self.tableNumQueue.pickAndAllocate()
-
- # TODO: Not used any more, to delete
- def addTable(self):
- with self._lock:
- tIndex = self.tableNumQueue.push()
- return tIndex
-
- # Not used any more, to delete
- def releaseTable(self, i): # return the table back, so others can use it
- self.tableNumQueue.release(i)
-
- # TODO: not used any more, delete
- def getTableNameToDelete(self):
- tblNum = self.tableNumQueue.pop() # TODO: race condition!
- if (not tblNum): # maybe false
- return False
-
- return "table_{}".format(tblNum)
-
- def cleanUp(self):
- self._dbConn.close()
-
class TaskExecutor():
class BoundedList:
def __init__(self, size=10):
@@ -1584,10 +1165,10 @@ class TaskExecutor():
self._boundedList.add(n)
# def logInfo(self, msg):
- # logger.info(" T[{}.x]: ".format(self._curStep) + msg)
+ # Logging.info(" T[{}.x]: ".format(self._curStep) + msg)
# def logDebug(self, msg):
- # logger.debug(" T[{}.x]: ".format(self._curStep) + msg)
+ # Logging.debug(" T[{}.x]: ".format(self._curStep) + msg)
class Task():
@@ -1600,19 +1181,19 @@ class Task():
@classmethod
def allocTaskNum(cls):
Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
- # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
+ # Logging.debug("Allocating taskSN: {}".format(Task.taskSn))
return Task.taskSn
def __init__(self, execStats: ExecutionStats, db: Database):
self._workerThread = None
- self._err = None # type: Exception
+ self._err: Optional[Exception] = None
self._aborted = False
self._curStep = None
self._numRows = None # Number of rows affected
# Assign an incremental task serial number
self._taskNum = self.allocTaskNum()
- # logger.debug("Creating new task {}...".format(self._taskNum))
+ # Logging.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats
self._db = db # A task is always associated/for a specific DB
@@ -1649,11 +1230,12 @@ class Task():
if errno in [
0x05, # TSDB_CODE_RPC_NOT_READY
0x0B, # Unable to establish connection, more details in TD-1648
- # 0x200, # invalid SQL, TODO: re-examine with TD-934
+ 0x200, # invalid SQL, TODO: re-examine with TD-934
0x217, # "db not selected", client side defined error code
- 0x218, # "Table does not exist" client side defined error code
- 0x360, 0x362,
- 0x369, # tag already exists
+ # 0x218, # "Table does not exist" client side defined error code
+ 0x360, # Table already exists
+ 0x362,
+ # 0x369, # tag already exists
0x36A, 0x36B, 0x36D,
0x381,
0x380, # "db not selected"
@@ -1662,12 +1244,17 @@ class Task():
0x503,
0x510, # vnode not in ready state
0x14, # db not ready, errno changed
- 0x600,
+ 0x600, # Invalid table ID, why?
1000 # REST catch-all error
]:
return True # These are the ALWAYS-ACCEPTABLE ones
- elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
- return True # We may get "network unavilable" when restarting service
+ # This case handled below already.
+ # elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
+ # return True # We may get "network unavilable" when restarting service
+ elif gConfig.ignore_errors: # something is specified on command line
+ moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')]
+ if errno in moreErrnos:
+ return True
elif errno == 0x200 : # invalid SQL, we need to div in a bit more
if msg.find("invalid column name") != -1:
return True
@@ -1675,8 +1262,8 @@ class Task():
return True
elif msg.find("duplicated column names") != -1: # also alter table tag issues
return True
- elif (gSvcMgr!=None) and gSvcMgr.isRestarting():
- logger.info("Ignoring error when service is restarting: errno = {}, msg = {}".format(errno, msg))
+ elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ...
+ Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
return True
return False # Not an acceptable error
@@ -1735,10 +1322,11 @@ class Task():
self._aborted = True
traceback.print_exc()
except BaseException: # TODO: what is this again??!!
- self.logDebug(
- "[=] Unexpected exception, SQL: {}".format(
- wt.getDbConn().getLastSql()))
- raise
+ raise RuntimeError("Punt")
+ # self.logDebug(
+ # "[=] Unexpected exception, SQL: {}".format(
+ # wt.getDbConn().getLastSql()))
+ # raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
self.logDebug("[X] task execution completed, {}, status: {}".format(
@@ -1817,14 +1405,14 @@ class ExecutionStats:
self._failureReason = reason
def printStats(self):
- logger.info(
+ Logging.info(
"----------------------------------------------------------------------")
- logger.info(
+ Logging.info(
"| Crash_Gen test {}, with the following stats:". format(
"FAILED (reason: {})".format(
self._failureReason) if self._failed else "SUCCEEDED"))
- logger.info("| Task Execution Times (success/total):")
- execTimesAny = 0
+ Logging.info("| Task Execution Times (success/total):")
+ execTimesAny = 0.0
for k, n in self._execTimes.items():
execTimesAny += n[0]
errStr = None
@@ -1834,28 +1422,28 @@ class ExecutionStats:
errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
# print("error strings = {}".format(errStrs))
errStr = ", ".join(errStrs)
- logger.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
+ Logging.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
- logger.info(
+ Logging.info(
"| Total Tasks Executed (success or not): {} ".format(execTimesAny))
- logger.info(
+ Logging.info(
"| Total Tasks In Progress at End: {}".format(
self._tasksInProgress))
- logger.info(
+ Logging.info(
"| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
self._accRunTime))
- logger.info(
+ Logging.info(
"| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
- logger.info(
+ Logging.info(
"| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
self._elapsedTime))
- logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
- logger.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
- logger.info("| Longest native query time: {:.3f} seconds, started: {}".
+ Logging.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
+ Logging.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
+ Logging.info("| Longest native query time: {:.3f} seconds, started: {}".
format(MyTDSql.longestQueryTime,
time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
- logger.info("| Longest native query: {}".format(MyTDSql.longestQuery))
- logger.info(
+ Logging.info("| Longest native query: {}".format(MyTDSql.longestQuery))
+ Logging.info(
"----------------------------------------------------------------------")
@@ -1865,11 +1453,14 @@ class StateTransitionTask(Task):
LARGE_NUMBER_OF_RECORDS = 50
SMALL_NUMBER_OF_RECORDS = 3
+ _baseTableNumber = None
+
+ _endState = None
+
@classmethod
def getInfo(cls): # each sub class should supply their own information
raise RuntimeError("Overriding method expected")
-
- _endState = None
+
@classmethod
def getEndState(cls): # TODO: optimize by calling it fewer times
raise RuntimeError("Overriding method expected")
@@ -1889,7 +1480,10 @@ class StateTransitionTask(Task):
@classmethod
def getRegTableName(cls, i):
- return "reg_table_{}".format(i)
+ if ( StateTransitionTask._baseTableNumber is None):
+ StateTransitionTask._baseTableNumber = Dice.throw(
+ 999) if gConfig.dynamic_db_table_names else 0
+ return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
def execute(self, wt: WorkerThread):
super().execute(wt)
@@ -1909,7 +1503,8 @@ class TaskCreateDb(StateTransitionTask):
# was: self.execWtSql(wt, "create database db")
repStr = ""
if gConfig.max_replicas != 1:
- numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
+ # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
+ numReplica = gConfig.max_replicas # fixed, always
repStr = "replica {}".format(numReplica)
self.execWtSql(wt, "create database {} {}"
.format(self._db.getName(), repStr) )
@@ -1925,7 +1520,7 @@ class TaskDropDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
self.execWtSql(wt, "drop database {}".format(self._db.getName()))
- logger.debug("[OPS] database dropped at {}".format(time.time()))
+ Logging.debug("[OPS] database dropped at {}".format(time.time()))
class TaskCreateSuperTable(StateTransitionTask):
@classmethod
@@ -1938,7 +1533,7 @@ class TaskCreateSuperTable(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not self._db.exists(wt.getDbConn()):
- logger.debug("Skipping task, no DB yet")
+ Logging.debug("Skipping task, no DB yet")
return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
@@ -1973,7 +1568,7 @@ class TdSuperTable:
dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno)
- logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
+ Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
raise
qr = dbc.getQueryResult()
@@ -2088,7 +1683,7 @@ class TaskReadData(StateTransitionTask):
dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName()))
except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno)
- logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
+ Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
raise
class TaskDropSuperTable(StateTransitionTask):
@@ -2119,7 +1714,7 @@ class TaskDropSuperTable(StateTransitionTask):
errno2 = Helper.convertErrno(err.errno)
if (errno2 in [0x362]): # mnode invalid table name
isSuccess = False
- logger.debug("[DB] Acceptable error when dropping a table")
+ Logging.debug("[DB] Acceptable error when dropping a table")
continue # try to delete next regular table
if (not tickOutput):
@@ -2199,20 +1794,19 @@ class TaskAddData(StateTransitionTask):
# Track which table is being actively worked on
activeTable: Set[int] = set()
- # We use these two files to record operations to DB, useful for power-off
- # tests
- fAddLogReady = None
- fAddLogDone = None
+ # We use these two files to record operations to DB, useful for power-off tests
+ fAddLogReady = None # type: TextIOWrapper
+ fAddLogDone = None # type: TextIOWrapper
@classmethod
def prepToRecordOps(cls):
if gConfig.record_ops:
if (cls.fAddLogReady is None):
- logger.info(
+ Logging.info(
"Recording in a file operations to be performed...")
cls.fAddLogReady = open("add_log_ready.txt", "w")
if (cls.fAddLogDone is None):
- logger.info("Recording in a file operations completed...")
+ Logging.info("Recording in a file operations completed...")
cls.fAddLogDone = open("add_log_done.txt", "w")
@classmethod
@@ -2288,490 +1882,8 @@ class TaskAddData(StateTransitionTask):
self.activeTable.discard(i) # not raising an error, unlike remove
-# Deterministic random number generator
-class Dice():
- seeded = False # static, uninitialized
-
- @classmethod
- def seed(cls, s): # static
- if (cls.seeded):
- raise RuntimeError(
- "Cannot seed the random generator more than once")
- cls.verifyRNG()
- random.seed(s)
- cls.seeded = True # TODO: protect against multi-threading
-
- @classmethod
- def verifyRNG(cls): # Verify that the RNG is determinstic
- random.seed(0)
- x1 = random.randrange(0, 1000)
- x2 = random.randrange(0, 1000)
- x3 = random.randrange(0, 1000)
- if (x1 != 864 or x2 != 394 or x3 != 776):
- raise RuntimeError("System RNG is not deterministic")
-
- @classmethod
- def throw(cls, stop): # get 0 to stop-1
- return cls.throwRange(0, stop)
-
- @classmethod
- def throwRange(cls, start, stop): # up to stop-1
- if (not cls.seeded):
- raise RuntimeError("Cannot throw dice before seeding it")
- return random.randrange(start, stop)
-
- @classmethod
- def choice(cls, cList):
- return random.choice(cList)
-
-
-class LoggingFilter(logging.Filter):
- def filter(self, record: logging.LogRecord):
- if (record.levelno >= logging.INFO):
- return True # info or above always log
-
- # Commenting out below to adjust...
-
- # if msg.startswith("[TRD]"):
- # return False
- return True
-
-
-class MyLoggingAdapter(logging.LoggerAdapter):
- def process(self, msg, kwargs):
- return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
- # return '[%s] %s' % (self.extra['connid'], msg), kwargs
-
-
-class SvcManager:
- def __init__(self):
- print("Starting TDengine Service Manager")
- # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
- # signal.signal(signal.SIGINT, self.sigIntHandler)
- # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
-
- self.inSigHandler = False
- # self._status = MainExec.STATUS_RUNNING # set inside
- # _startTaosService()
- self.svcMgrThread = None # type: ServiceManagerThread
- self._lock = threading.Lock()
- self._isRestarting = False
-
- def _doMenu(self):
- choice = ""
- while True:
- print("\nInterrupting Service Program, Choose an Action: ")
- print("1: Resume")
- print("2: Terminate")
- print("3: Restart")
- # Remember to update the if range below
- # print("Enter Choice: ", end="", flush=True)
- while choice == "":
- choice = input("Enter Choice: ")
- if choice != "":
- break # done with reading repeated input
- if choice in ["1", "2", "3"]:
- break # we are done with whole method
- print("Invalid choice, please try again.")
- choice = "" # reset
- return choice
-
- def sigUsrHandler(self, signalNumber, frame):
- print("Interrupting main thread execution upon SIGUSR1")
- if self.inSigHandler: # already
- print("Ignoring repeated SIG...")
- return # do nothing if it's already not running
- self.inSigHandler = True
-
- choice = self._doMenu()
- if choice == "1":
- # TODO: can the sub-process be blocked due to us not reading from
- # queue?
- self.sigHandlerResume()
- elif choice == "2":
- self.stopTaosService()
- elif choice == "3": # Restart
- self.restart()
- else:
- raise RuntimeError("Invalid menu choice: {}".format(choice))
-
- self.inSigHandler = False
-
- def sigIntHandler(self, signalNumber, frame):
- print("SvcManager: INT Signal Handler starting...")
- if self.inSigHandler:
- print("Ignoring repeated SIG_INT...")
- return
- self.inSigHandler = True
-
- self.stopTaosService()
- print("SvcManager: INT Signal Handler returning...")
- self.inSigHandler = False
-
- def sigHandlerResume(self):
- print("Resuming TDengine service manager thread (main thread)...\n\n")
-
- def _checkServiceManagerThread(self):
- if self.svcMgrThread: # valid svc mgr thread
- if self.svcMgrThread.isStopped(): # done?
- self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
- self.svcMgrThread = None # no more
-
- def _procIpcAll(self):
- while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
- if self.isRunning():
- self.svcMgrThread.procIpcBatch() # regular processing,
- self._checkServiceManagerThread()
- elif self.isRetarting():
- print("Service restarting...")
- time.sleep(0.5) # pause, before next round
- print(
- "Service Manager Thread (with subprocess) has ended, main thread now exiting...")
-
- def startTaosService(self):
- with self._lock:
- if self.svcMgrThread:
- raise RuntimeError("Cannot start TAOS service when one may already be running")
-
- # Find if there's already a taosd service, and then kill it
- for proc in psutil.process_iter():
- if proc.name() == 'taosd':
- print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
- time.sleep(2.0)
- proc.kill()
- # print("Process: {}".format(proc.name()))
-
-
- self.svcMgrThread = ServiceManagerThread() # create the object
- print("Attempting to start TAOS service started, printing out output...")
- self.svcMgrThread.start()
- self.svcMgrThread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
- print("TAOS service started")
-
- def stopTaosService(self, outputLines=20):
- with self._lock:
- if not self.isRunning():
- logger.warning("Cannot stop TAOS service, not running")
- return
-
- print("Terminating Service Manager Thread (SMT) execution...")
- self.svcMgrThread.stop()
- if self.svcMgrThread.isStopped():
- self.svcMgrThread.procIpcBatch(outputLines) # one last time
- self.svcMgrThread = None
- print("End of TDengine Service Output")
- print("----- TDengine Service (managed by SMT) is now terminated -----\n")
- else:
- print("WARNING: SMT did not terminate as expected")
-
- def run(self):
- self.startTaosService()
- self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
- if self.isRunning(): # if sig handler hasn't destroyed it by now
- self.stopTaosService() # should have started already
-
- def restart(self):
- if self._isRestarting:
- logger.warning("Cannot restart service when it's already restarting")
- return
-
- self._isRestarting = True
- if self.isRunning():
- self.stopTaosService()
- else:
- logger.warning("Service not running when restart requested")
-
- self.startTaosService()
- self._isRestarting = False
-
- def isRunning(self):
- return self.svcMgrThread != None
-
- def isRestarting(self):
- return self._isRestarting
-
-class ServiceManagerThread:
- MAX_QUEUE_SIZE = 10000
-
- def __init__(self):
- self._tdeSubProcess = None # type: TdeSubProcess
- self._thread = None
- self._status = None
-
- def getStatus(self):
- return self._status
-
- def isRunning(self):
- # return self._thread and self._thread.is_alive()
- return self._status == MainExec.STATUS_RUNNING
-
- def isStopping(self):
- return self._status == MainExec.STATUS_STOPPING
-
- def isStopped(self):
- return self._status == MainExec.STATUS_STOPPED
-
- # Start the thread (with sub process), and wait for the sub service
- # to become fully operational
- def start(self):
- if self._thread:
- raise RuntimeError("Unexpected _thread")
- if self._tdeSubProcess:
- raise RuntimeError("TDengine sub process already created/running")
-
- self._status = MainExec.STATUS_STARTING
-
- self._tdeSubProcess = TdeSubProcess()
- self._tdeSubProcess.start()
-
- self._ipcQueue = Queue()
- self._thread = threading.Thread( # First thread captures server OUTPUT
- target=self.svcOutputReader,
- args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
- self._thread.daemon = True # thread dies with the program
- self._thread.start()
-
- self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
- target=self.svcErrorReader,
- args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
- self._thread2.daemon = True # thread dies with the program
- self._thread2.start()
-
- # wait for service to start
- for i in range(0, 100):
- time.sleep(1.0)
- # self.procIpcBatch() # don't pump message during start up
- print("_zz_", end="", flush=True)
- if self._status == MainExec.STATUS_RUNNING:
- logger.info("[] TDengine service READY to process requests")
- return # now we've started
- # TODO: handle this better?
- self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
- raise RuntimeError("TDengine service did not start successfully")
-
- def stop(self):
- # can be called from both main thread or signal handler
- print("Terminating TDengine service running as the sub process...")
- if self.isStopped():
- print("Service already stopped")
- return
- if self.isStopping():
- print("Service is already being stopped")
- return
- # Linux will send Control-C generated SIGINT to the TDengine process
- # already, ref:
- # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
- if not self._tdeSubProcess:
- raise RuntimeError("sub process object missing")
-
- self._status = MainExec.STATUS_STOPPING
- retCode = self._tdeSubProcess.stop()
- print("Attempted to stop sub process, got return code: {}".format(retCode))
- if (retCode==-11): # SGV
- logger.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
-
- if self._tdeSubProcess.isRunning(): # still running
- print("FAILED to stop sub process, it is still running... pid = {}".format(
- self._tdeSubProcess.getPid()))
- else:
- self._tdeSubProcess = None # not running any more
- self.join() # stop the thread, change the status, etc.
-
- def join(self):
- # TODO: sanity check
- if not self.isStopping():
- raise RuntimeError(
- "Unexpected status when ending svc mgr thread: {}".format(
- self._status))
-
- if self._thread:
- self._thread.join()
- self._thread = None
- self._status = MainExec.STATUS_STOPPED
- # STD ERR thread
- self._thread2.join()
- self._thread2 = None
- else:
- print("Joining empty thread, doing nothing")
-
- def _trimQueue(self, targetSize):
- if targetSize <= 0:
- return # do nothing
- q = self._ipcQueue
- if (q.qsize() <= targetSize): # no need to trim
- return
-
- logger.debug("Triming IPC queue to target size: {}".format(targetSize))
- itemsToTrim = q.qsize() - targetSize
- for i in range(0, itemsToTrim):
- try:
- q.get_nowait()
- except Empty:
- break # break out of for loop, no more trimming
-
- TD_READY_MSG = "TDengine is initialized successfully"
-
- def procIpcBatch(self, trimToTarget=0, forceOutput=False):
- self._trimQueue(trimToTarget) # trim if necessary
- # Process all the output generated by the underlying sub process,
- # managed by IO thread
- print("<", end="", flush=True)
- while True:
- try:
- line = self._ipcQueue.get_nowait() # getting output at fast speed
- self._printProgress("_o")
- except Empty:
- # time.sleep(2.3) # wait only if there's no output
- # no more output
- print(".>", end="", flush=True)
- return # we are done with THIS BATCH
- else: # got line, printing out
- if forceOutput:
- logger.info(line)
- else:
- logger.debug(line)
- print(">", end="", flush=True)
-
- _ProgressBars = ["--", "//", "||", "\\\\"]
-
- def _printProgress(self, msg): # TODO: assuming 2 chars
- print(msg, end="", flush=True)
- pBar = self._ProgressBars[Dice.throw(4)]
- print(pBar, end="", flush=True)
- print('\b\b\b\b', end="", flush=True)
-
- def svcOutputReader(self, out: IO, queue):
- # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
- # print("This is the svcOutput Reader...")
- # for line in out :
- for line in iter(out.readline, b''):
- # print("Finished reading a line: {}".format(line))
- # print("Adding item to queue...")
- try:
- line = line.decode("utf-8").rstrip()
- except UnicodeError:
- print("\nNon-UTF8 server output: {}\n".format(line))
-
- # This might block, and then causing "out" buffer to block
- queue.put(line)
- self._printProgress("_i")
-
- if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started
- if line.find(self.TD_READY_MSG) != -1: # found
- logger.info("Waiting for the service to become FULLY READY")
- time.sleep(1.0) # wait for the server to truly start. TODO: remove this
- logger.info("Service is now FULLY READY")
- self._status = MainExec.STATUS_RUNNING
-
- # Trim the queue if necessary: TODO: try this 1 out of 10 times
- self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
-
- if self.isStopping(): # TODO: use thread status instead
- # WAITING for stopping sub process to finish its outptu
- print("_w", end="", flush=True)
-
- # queue.put(line)
- # meaning sub process must have died
- print("\nNo more output from IO thread managing TDengine service")
- out.close()
-
- def svcErrorReader(self, err: IO, queue):
- for line in iter(err.readline, b''):
- print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
-
-class TdeSubProcess:
- def __init__(self):
- self.subProcess = None
-
- def getStdOut(self):
- return self.subProcess.stdout
-
- def getStdErr(self):
- return self.subProcess.stderr
-
- def isRunning(self):
- return self.subProcess is not None
-
- def getPid(self):
- return self.subProcess.pid
-
- def getBuildPath(self):
- selfPath = os.path.dirname(os.path.realpath(__file__))
- if ("community" in selfPath):
- projPath = selfPath[:selfPath.find("communit")]
- else:
- projPath = selfPath[:selfPath.find("tests")]
- for root, dirs, files in os.walk(projPath):
- if ("taosd" in files):
- rootRealPath = os.path.dirname(os.path.realpath(root))
- if ("packaging" not in rootRealPath):
- buildPath = root[:len(root) - len("/build/bin")]
- break
- return buildPath
-
- def start(self):
- ON_POSIX = 'posix' in sys.builtin_module_names
-
- taosdPath = self.getBuildPath() + "/build/bin/taosd"
- cfgPath = self.getBuildPath() + "/test/cfg"
-
- # Delete the log files
- logPath = self.getBuildPath() + "/test/log"
- # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
- # filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ]
- # for f in filelist:
- # filePath = os.path.join(logPath, f)
- # print("Removing log file: {}".format(filePath))
- # os.remove(filePath)
- if os.path.exists(logPath):
- logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
- logger.info("Saving old log files to: {}".format(logPathSaved))
- os.rename(logPath, logPathSaved)
- # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
-
- svcCmd = [taosdPath, '-c', cfgPath]
- # svcCmdSingle = "{} -c {}".format(taosdPath, cfgPath)
- # svcCmd = ['vmstat', '1']
- if self.subProcess: # already there
- raise RuntimeError("Corrupt process state")
-
- # print("Starting service: {}".format(svcCmd))
- self.subProcess = subprocess.Popen(
- svcCmd, shell=False,
- # svcCmdSingle, shell=True, # capture core dump?
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- # bufsize=1, # not supported in binary mode
- close_fds=ON_POSIX
- ) # had text=True, which interferred with reading EOF
-
- def stop(self):
- if not self.subProcess:
- print("Sub process already stopped")
- return -1
-
- retCode = self.subProcess.poll() # contains real sub process return code
- if retCode: # valid return code, process ended
- self.subProcess = None
- else: # process still alive, let's interrupt it
- print(
- "Sub process is running, sending SIG_INT and waiting for it to terminate...")
- # sub process should end, then IPC queue should end, causing IO
- # thread to end
- self.subProcess.send_signal(signal.SIGINT)
- try:
- self.subProcess.wait(10)
- retCode = self.subProcess.returncode
- except subprocess.TimeoutExpired as err:
- print("Time out waiting for TDengine service process to exit")
- retCode = -3
- else:
- print("TDengine service process terminated successfully from SIG_INT")
- retCode = -4
- self.subProcess = None
- return retCode
class ThreadStacks: # stack info for all threads
def __init__(self):
@@ -2808,17 +1920,17 @@ class ClientManager:
# signal.signal(signal.SIGTERM, self.sigIntHandler)
# signal.signal(signal.SIGINT, self.sigIntHandler)
- self._status = MainExec.STATUS_RUNNING
+ self._status = Status.STATUS_RUNNING
self.tc = None
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
- if self._status != MainExec.STATUS_RUNNING:
+ if self._status != Status.STATUS_RUNNING:
print("Repeated SIGINT received, forced exit...")
# return # do nothing if it's already not running
sys.exit(-1)
- self._status = MainExec.STATUS_STOPPING # immediately set our status
+ self._status = Status.STATUS_STOPPING # immediately set our status
print("ClientManager: Terminating program...")
self.tc.requestToStop()
@@ -2898,15 +2010,20 @@ class ClientManager:
# self._printLastNumbers()
global gConfig
- dbManager = DbManager() # Regular function
+ # Prepare Tde Instance
+ global gContainer
+ tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"
+
+ dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget()) # Regular function
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
+ print("Starting client instance to: {}".format(tInst))
self.tc.run()
# print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed()))
if svcMgr: # gConfig.auto_start_service:
- svcMgr.stopTaosService()
+ svcMgr.stopTaosServices()
svcMgr = None
# Print exec status, etc., AFTER showing messages from the server
self.conclude()
@@ -2936,18 +2053,10 @@ class ClientManager:
# self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
self.tc.printStats()
-
-
-
class MainExec:
- STATUS_STARTING = 1
- STATUS_RUNNING = 2
- STATUS_STOPPING = 3
- STATUS_STOPPED = 4
-
def __init__(self):
self._clientMgr = None
- self._svcMgr = None
+ self._svcMgr = None # type: ServiceManager
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
@@ -2960,219 +2069,185 @@ class MainExec:
self._svcMgr.sigUsrHandler(signalNumber, frame)
def sigIntHandler(self, signalNumber, frame):
- if self._svcMgr:
+ if self._svcMgr:
self._svcMgr.sigIntHandler(signalNumber, frame)
- if self._clientMgr:
+ if self._clientMgr:
self._clientMgr.sigIntHandler(signalNumber, frame)
def runClient(self):
global gSvcMgr
if gConfig.auto_start_service:
- self._svcMgr = SvcManager()
- gSvcMgr = self._svcMgr # hack alert
- self._svcMgr.startTaosService() # we start, don't run
+ gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
+ gSvcMgr.startTaosServices() # we start, don't run
self._clientMgr = ClientManager()
ret = None
try:
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
except requests.exceptions.ConnectionError as err:
- logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
+ Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
# don't raise
return ret
def runService(self):
global gSvcMgr
- self._svcMgr = SvcManager()
- gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert
-
- self._svcMgr.run() # run to some end state
- self._svcMgr = None
- gSvcMgr = None
-
- def runTemp(self): # for debugging purposes
- # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
- # dbc = dbState.getDbConn()
- # sTbName = dbState.getFixedSuperTableName()
- # dbc.execute("create database if not exists db")
- # if not dbState.getState().equals(StateEmpty()):
- # dbc.execute("use db")
-
- # rTables = None
- # try: # the super table may not exist
- # sql = "select TBNAME from db.{}".format(sTbName)
- # logger.info("Finding out tables in super table: {}".format(sql))
- # dbc.query(sql) # TODO: analyze result set later
- # logger.info("Fetching result")
- # rTables = dbc.getQueryResult()
- # logger.info("Result: {}".format(rTables))
- # except taos.error.ProgrammingError as err:
- # logger.info("Initial Super table OPS error: {}".format(err))
-
- # # sys.exit()
- # if ( not rTables == None):
- # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
- # try:
- # for rTbName in rTables : # regular tables
- # ds = dbState
- # logger.info("Inserting into table: {}".format(rTbName[0]))
- # sql = "insert into db.{} values ('{}', {});".format(
- # rTbName[0],
- # ds.getNextTick(), ds.getNextInt())
- # dbc.execute(sql)
- # for rTbName in rTables : # regular tables
- # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
- # logger.info("Initial READING operation is successful")
- # except taos.error.ProgrammingError as err:
- # logger.info("Initial WRITE/READ error: {}".format(err))
-
- # Sandbox testing code
- # dbc = dbState.getDbConn()
- # while True:
- # rows = dbc.query("show databases")
- # print("Rows: {}, time={}".format(rows, time.time()))
- return
-
-
-def main():
- # Super cool Python argument library:
- # https://docs.python.org/3/library/argparse.html
- parser = argparse.ArgumentParser(
- formatter_class=argparse.RawDescriptionHelpFormatter,
- description=textwrap.dedent('''\
- TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
- ---------------------------------------------------------------------
- 1. You build TDengine in the top level ./build directory, as described in offical docs
- 2. You run the server there before this script: ./build/bin/taosd -c test/cfg
-
- '''))
-
- # parser.add_argument('-a', '--auto-start-service', action='store_true',
- # help='Automatically start/stop the TDengine service (default: false)')
- # parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
- # help='Connector type to use: native, rest, or mixed (default: 10)')
- # parser.add_argument('-d', '--debug', action='store_true',
- # help='Turn on DEBUG mode for more logging (default: false)')
- # parser.add_argument('-e', '--run-tdengine', action='store_true',
- # help='Run TDengine service in foreground (default: false)')
- # parser.add_argument('-l', '--larger-data', action='store_true',
- # help='Write larger amount of data during write operations (default: false)')
- # parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
- # help='Use a single shared db connection (default: false)')
- # parser.add_argument('-r', '--record-ops', action='store_true',
- # help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
- # parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
- # help='Maximum number of steps to run (default: 100)')
- # parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
- # help='Number of threads to run (default: 10)')
- # parser.add_argument('-x', '--continue-on-exception', action='store_true',
- # help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
-
- parser.add_argument(
- '-a',
- '--auto-start-service',
- action='store_true',
- help='Automatically start/stop the TDengine service (default: false)')
- parser.add_argument(
- '-b',
- '--max-dbs',
- action='store',
- default=0,
- type=int,
- help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
- parser.add_argument(
- '-c',
- '--connector-type',
- action='store',
- default='native',
- type=str,
- help='Connector type to use: native, rest, or mixed (default: 10)')
- parser.add_argument(
- '-d',
- '--debug',
- action='store_true',
- help='Turn on DEBUG mode for more logging (default: false)')
- parser.add_argument(
- '-e',
- '--run-tdengine',
- action='store_true',
- help='Run TDengine service in foreground (default: false)')
- parser.add_argument(
- '-i',
- '--max-replicas',
- action='store',
- default=1,
- type=int,
- help='Maximum number of replicas to use, when testing against clusters. (default: 1)')
- parser.add_argument(
- '-l',
- '--larger-data',
- action='store_true',
- help='Write larger amount of data during write operations (default: false)')
- parser.add_argument(
- '-p',
- '--per-thread-db-connection',
- action='store_true',
- help='Use a single shared db connection (default: false)')
- parser.add_argument(
- '-r',
- '--record-ops',
- action='store_true',
- help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
- parser.add_argument(
- '-s',
- '--max-steps',
- action='store',
- default=1000,
- type=int,
- help='Maximum number of steps to run (default: 100)')
- parser.add_argument(
- '-t',
- '--num-threads',
- action='store',
- default=5,
- type=int,
- help='Number of threads to run (default: 10)')
- parser.add_argument(
- '-v',
- '--verify-data',
- action='store_true',
- help='Verify data written in a number of places by reading back (default: false)')
- parser.add_argument(
- '-x',
- '--continue-on-exception',
- action='store_true',
- help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
-
- global gConfig
- gConfig = parser.parse_args()
-
- # Logging Stuff
- global logger
- _logger = logging.getLogger('CrashGen') # real logger
- _logger.addFilter(LoggingFilter())
- ch = logging.StreamHandler()
- _logger.addHandler(ch)
-
- # Logging adapter, to be used as a logger
- logger = MyLoggingAdapter(_logger, [])
-
- if (gConfig.debug):
- logger.setLevel(logging.DEBUG) # default seems to be INFO
- else:
- logger.setLevel(logging.INFO)
-
- Dice.seed(0) # initial seeding of dice
-
- # Run server or client
- mExec = MainExec()
- if gConfig.run_tdengine: # run server
- mExec.runService()
- else:
- return mExec.runClient()
-
-
-if __name__ == "__main__":
- exitCode = main()
- # print("Exiting with code: {}".format(exitCode))
- sys.exit(exitCode)
+ gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert
+
+ gSvcMgr.run() # run to some end state
+ gSvcMgr = self._svcMgr = None
+
+ def init(self): # TODO: refactor
+ global gContainer
+ gContainer = Container() # micky-mouse DI
+
+ global gSvcMgr # TODO: refactor away
+ gSvcMgr = None
+
+ # Super cool Python argument library:
+ # https://docs.python.org/3/library/argparse.html
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=textwrap.dedent('''\
+ TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
+ ---------------------------------------------------------------------
+ 1. You build TDengine in the top level ./build directory, as described in offical docs
+ 2. You run the server there before this script: ./build/bin/taosd -c test/cfg
+
+ '''))
+
+ parser.add_argument(
+ '-a',
+ '--auto-start-service',
+ action='store_true',
+ help='Automatically start/stop the TDengine service (default: false)')
+ parser.add_argument(
+ '-b',
+ '--max-dbs',
+ action='store',
+ default=0,
+ type=int,
+ help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
+ parser.add_argument(
+ '-c',
+ '--connector-type',
+ action='store',
+ default='native',
+ type=str,
+ help='Connector type to use: native, rest, or mixed (default: 10)')
+ parser.add_argument(
+ '-d',
+ '--debug',
+ action='store_true',
+ help='Turn on DEBUG mode for more logging (default: false)')
+ parser.add_argument(
+ '-e',
+ '--run-tdengine',
+ action='store_true',
+ help='Run TDengine service in foreground (default: false)')
+ parser.add_argument(
+ '-g',
+ '--ignore-errors',
+ action='store',
+ default=None,
+ type=str,
+ help='Ignore error codes, comma separated, 0x supported (default: None)')
+ parser.add_argument(
+ '-i',
+ '--max-replicas',
+ action='store',
+ default=1,
+ type=int,
+ help='Maximum number of replicas to use, when testing against clusters. (default: 1)')
+ parser.add_argument(
+ '-l',
+ '--larger-data',
+ action='store_true',
+ help='Write larger amount of data during write operations (default: false)')
+ parser.add_argument(
+ '-n',
+ '--dynamic-db-table-names',
+ action='store_true',
+ help='Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)')
+ parser.add_argument(
+ '-o',
+ '--num-dnodes',
+ action='store',
+ default=1,
+ type=int,
+ help='Number of Dnodes to initialize, used with -e option. (default: 1)')
+ parser.add_argument(
+ '-p',
+ '--per-thread-db-connection',
+ action='store_true',
+ help='Use a single shared db connection (default: false)')
+ parser.add_argument(
+ '-r',
+ '--record-ops',
+ action='store_true',
+ help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
+ parser.add_argument(
+ '-s',
+ '--max-steps',
+ action='store',
+ default=1000,
+ type=int,
+ help='Maximum number of steps to run (default: 100)')
+ parser.add_argument(
+ '-t',
+ '--num-threads',
+ action='store',
+ default=5,
+ type=int,
+ help='Number of threads to run (default: 10)')
+ parser.add_argument(
+ '-v',
+ '--verify-data',
+ action='store_true',
+ help='Verify data written in a number of places by reading back (default: false)')
+ parser.add_argument(
+ '-x',
+ '--continue-on-exception',
+ action='store_true',
+ help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
+
+ global gConfig
+ gConfig = parser.parse_args()
+
+ Logging.clsInit(gConfig)
+
+ Dice.seed(0) # initial seeding of dice
+
+ def run(self):
+ if gConfig.run_tdengine: # run server
+ try:
+ self.runService()
+ return 0 # success
+ except ConnectionError as err:
+ Logging.error("Failed to make DB connection, please check DB instance manually")
+ return -1 # failure
+ else:
+ return self.runClient()
+
+
+class Container():
+ _propertyList = {'defTdeInstance'}
+
+ def __init__(self):
+ self._cargo = {} # No cargo at the beginning
+
+ def _verifyValidProperty(self, name):
+ if not name in self._propertyList:
+ raise CrashGenError("Invalid container property: {}".format(name))
+
+ # Called for an attribute, when other mechanisms fail (compare to __getattribute__)
+ def __getattr__(self, name):
+ self._verifyValidProperty(name)
+ return self._cargo[name] # just a simple lookup
+
+ def __setattr__(self, name, value):
+ if name == '_cargo' : # reserved vars
+ super().__setattr__(name, value)
+ return
+ self._verifyValidProperty(name)
+ self._cargo[name] = value
+
diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/db.py
new file mode 100644
index 0000000000000000000000000000000000000000..43c855647c03d1de3e55393eb85c77250a00a602
--- /dev/null
+++ b/tests/pytest/crash_gen/db.py
@@ -0,0 +1,435 @@
+from __future__ import annotations
+
+import sys
+import time
+import threading
+import requests
+from requests.auth import HTTPBasicAuth
+
+import taos
+from util.sql import *
+from util.cases import *
+from util.dnodes import *
+from util.log import *
+
+from .misc import Logging, CrashGenError, Helper, Dice
+import os
+import datetime
+# from .service_manager import TdeInstance
+
+class DbConn:
+ TYPE_NATIVE = "native-c"
+ TYPE_REST = "rest-api"
+ TYPE_INVALID = "invalid"
+
+ @classmethod
+ def create(cls, connType, dbTarget):
+ if connType == cls.TYPE_NATIVE:
+ return DbConnNative(dbTarget)
+ elif connType == cls.TYPE_REST:
+ return DbConnRest(dbTarget)
+ else:
+ raise RuntimeError(
+ "Unexpected connection type: {}".format(connType))
+
+ @classmethod
+ def createNative(cls, dbTarget) -> DbConn:
+ return cls.create(cls.TYPE_NATIVE, dbTarget)
+
+ @classmethod
+ def createRest(cls, dbTarget) -> DbConn:
+ return cls.create(cls.TYPE_REST, dbTarget)
+
+ def __init__(self, dbTarget):
+ self.isOpen = False
+ self._type = self.TYPE_INVALID
+ self._lastSql = None
+ self._dbTarget = dbTarget
+
+ def __repr__(self):
+ return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget)
+
+ def getLastSql(self):
+ return self._lastSql
+
+ def open(self):
+ if (self.isOpen):
+ raise RuntimeError("Cannot re-open an existing DB connection")
+
+ # below implemented by child classes
+ self.openByType()
+
+ Logging.debug("[DB] data connection opened: {}".format(self))
+ self.isOpen = True
+
+ def close(self):
+ raise RuntimeError("Unexpected execution, should be overriden")
+
+ def queryScalar(self, sql) -> int:
+ return self._queryAny(sql)
+
+ def queryString(self, sql) -> str:
+ return self._queryAny(sql)
+
+ def _queryAny(self, sql): # actual query result as an int
+ if (not self.isOpen):
+ raise RuntimeError("Cannot query database until connection is open")
+ nRows = self.query(sql)
+ if nRows != 1:
+ raise taos.error.ProgrammingError(
+ "Unexpected result for query: {}, rows = {}".format(sql, nRows),
+ (0x991 if nRows==0 else 0x992)
+ )
+ if self.getResultRows() != 1 or self.getResultCols() != 1:
+ raise RuntimeError("Unexpected result set for query: {}".format(sql))
+ return self.getQueryResult()[0][0]
+
+ def use(self, dbName):
+ self.execute("use {}".format(dbName))
+
+ def existsDatabase(self, dbName: str):
+ ''' Check if a certain database exists '''
+ self.query("show databases")
+ dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
+ # ret2 = dbName in dbs
+ # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
+ return dbName in dbs # TODO: super weird type mangling seen, once here
+
+ def hasTables(self):
+ return self.query("show tables") > 0
+
+ def execute(self, sql):
+ ''' Return the number of rows affected'''
+ raise RuntimeError("Unexpected execution, should be overriden")
+
+ def safeExecute(self, sql):
+ '''Safely execute any SQL query, returning True/False upon success/failure'''
+ try:
+ self.execute(sql)
+ return True # ignore num of results, return success
+ except taos.error.ProgrammingError as err:
+ return False # failed, for whatever TAOS reason
+ # Not possile to reach here, non-TAOS exception would have been thrown
+
+ def query(self, sql) -> int: # return num rows returned
+ ''' Return the number of rows affected'''
+ raise RuntimeError("Unexpected execution, should be overriden")
+
+ def openByType(self):
+ raise RuntimeError("Unexpected execution, should be overriden")
+
+ def getQueryResult(self):
+ raise RuntimeError("Unexpected execution, should be overriden")
+
+ def getResultRows(self):
+ raise RuntimeError("Unexpected execution, should be overriden")
+
+ def getResultCols(self):
+ raise RuntimeError("Unexpected execution, should be overriden")
+
+# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
+
+
+class DbConnRest(DbConn):
+ REST_PORT_INCREMENT = 11
+
+ def __init__(self, dbTarget: DbTarget):
+ super().__init__(dbTarget)
+ self._type = self.TYPE_REST
+ restPort = dbTarget.port + 11
+ self._url = "http://{}:{}/rest/sql".format(
+ dbTarget.hostAddr, dbTarget.port + self.REST_PORT_INCREMENT)
+ self._result = None
+
+ def openByType(self): # Open connection
+ pass # do nothing, always open
+
+ def close(self):
+ if (not self.isOpen):
+ raise RuntimeError("Cannot clean up database until connection is open")
+ # Do nothing for REST
+ Logging.debug("[DB] REST Database connection closed")
+ self.isOpen = False
+
+ def _doSql(self, sql):
+ self._lastSql = sql # remember this, last SQL attempted
+ try:
+ r = requests.post(self._url,
+ data = sql,
+ auth = HTTPBasicAuth('root', 'taosdata'))
+ except:
+ print("REST API Failure (TODO: more info here)")
+ raise
+ rj = r.json()
+ # Sanity check for the "Json Result"
+ if ('status' not in rj):
+ raise RuntimeError("No status in REST response")
+
+ if rj['status'] == 'error': # clearly reported error
+ if ('code' not in rj): # error without code
+ raise RuntimeError("REST error return without code")
+ errno = rj['code'] # May need to massage this in the future
+ # print("Raising programming error with REST return: {}".format(rj))
+ raise taos.error.ProgrammingError(
+ rj['desc'], errno) # todo: check existance of 'desc'
+
+ if rj['status'] != 'succ': # better be this
+ raise RuntimeError(
+ "Unexpected REST return status: {}".format(
+ rj['status']))
+
+ nRows = rj['rows'] if ('rows' in rj) else 0
+ self._result = rj
+ return nRows
+
+ def execute(self, sql):
+ if (not self.isOpen):
+ raise RuntimeError(
+ "Cannot execute database commands until connection is open")
+ Logging.debug("[SQL-REST] Executing SQL: {}".format(sql))
+ nRows = self._doSql(sql)
+ Logging.debug(
+ "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
+ return nRows
+
+ def query(self, sql): # return rows affected
+ return self.execute(sql)
+
+ def getQueryResult(self):
+ return self._result['data']
+
+ def getResultRows(self):
+ print(self._result)
+ raise RuntimeError("TBD") # TODO: finish here to support -v under -c rest
+ # return self._tdSql.queryRows
+
+ def getResultCols(self):
+ print(self._result)
+ raise RuntimeError("TBD")
+
+ # Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
+
+
+class MyTDSql:
+ # Class variables
+ _clsLock = threading.Lock() # class wide locking
+ longestQuery = None # type: str
+ longestQueryTime = 0.0 # seconds
+ lqStartTime = 0.0
+ # lqEndTime = 0.0 # Not needed, as we have the two above already
+
+ def __init__(self, hostAddr, cfgPath):
+ # Make the DB connection
+ self._conn = taos.connect(host=hostAddr, config=cfgPath)
+ self._cursor = self._conn.cursor()
+
+ self.queryRows = 0
+ self.queryCols = 0
+ self.affectedRows = 0
+
+ # def init(self, cursor, log=True):
+ # self.cursor = cursor
+ # if (log):
+ # caller = inspect.getframeinfo(inspect.stack()[1][0])
+ # self.cursor.log(caller.filename + ".sql")
+
+ def close(self):
+ self._cursor.close() # can we double close?
+ self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
+ self._cursor.close()
+
+ def _execInternal(self, sql):
+ startTime = time.time()
+ ret = self._cursor.execute(sql)
+ # print("\nSQL success: {}".format(sql))
+ queryTime = time.time() - startTime
+ # Record the query time
+ cls = self.__class__
+ if queryTime > (cls.longestQueryTime + 0.01) :
+ with cls._clsLock:
+ cls.longestQuery = sql
+ cls.longestQueryTime = queryTime
+ cls.lqStartTime = startTime
+ return ret
+
+ def query(self, sql):
+ self.sql = sql
+ try:
+ self._execInternal(sql)
+ self.queryResult = self._cursor.fetchall()
+ self.queryRows = len(self.queryResult)
+ self.queryCols = len(self._cursor.description)
+ except Exception as e:
+ # caller = inspect.getframeinfo(inspect.stack()[1][0])
+ # args = (caller.filename, caller.lineno, sql, repr(e))
+ # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
+ raise
+ return self.queryRows
+
+ def execute(self, sql):
+ self.sql = sql
+ try:
+ self.affectedRows = self._execInternal(sql)
+ except Exception as e:
+ # caller = inspect.getframeinfo(inspect.stack()[1][0])
+ # args = (caller.filename, caller.lineno, sql, repr(e))
+ # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
+ raise
+ return self.affectedRows
+
+class DbTarget:
+ def __init__(self, cfgPath, hostAddr, port):
+ self.cfgPath = cfgPath
+ self.hostAddr = hostAddr
+ self.port = port
+
+ def __repr__(self):
+ return "[DbTarget: cfgPath={}, host={}:{}]".format(
+ Helper.getFriendlyPath(self.cfgPath), self.hostAddr, self.port)
+
+ def getEp(self):
+ return "{}:{}".format(self.hostAddr, self.port)
+
+class DbConnNative(DbConn):
+ # Class variables
+ _lock = threading.Lock()
+ # _connInfoDisplayed = False # TODO: find another way to display this
+ totalConnections = 0 # Not private
+
+ def __init__(self, dbTarget):
+ super().__init__(dbTarget)
+ self._type = self.TYPE_NATIVE
+ self._conn = None
+ # self._cursor = None
+
+ def openByType(self): # Open connection
+ # global gContainer
+ # tInst = tInst or gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
+ # cfgPath = self.getBuildPath() + "/test/cfg"
+ # cfgPath = tInst.getCfgDir()
+ # hostAddr = tInst.getHostAddr()
+
+ cls = self.__class__ # Get the class, to access class variables
+ with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
+ dbTarget = self._dbTarget
+ # if not cls._connInfoDisplayed:
+ # cls._connInfoDisplayed = True # updating CLASS variable
+ Logging.debug("Initiating TAOS native connection to {}".format(dbTarget))
+ # Make the connection
+ # self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
+ # self._cursor = self._conn.cursor()
+ # Record the count in the class
+ self._tdSql = MyTDSql(dbTarget.hostAddr, dbTarget.cfgPath) # making DB connection
+ cls.totalConnections += 1
+
+ self._tdSql.execute('reset query cache')
+ # self._cursor.execute('use db') # do this at the beginning of every
+
+ # Open connection
+ # self._tdSql = MyTDSql()
+ # self._tdSql.init(self._cursor)
+
+ def close(self):
+ if (not self.isOpen):
+ raise RuntimeError("Cannot clean up database until connection is open")
+ self._tdSql.close()
+ # Decrement the class wide counter
+ cls = self.__class__ # Get the class, to access class variables
+ with cls._lock:
+ cls.totalConnections -= 1
+
+ Logging.debug("[DB] Database connection closed")
+ self.isOpen = False
+
+ def execute(self, sql):
+ if (not self.isOpen):
+ raise RuntimeError("Cannot execute database commands until connection is open")
+ Logging.debug("[SQL] Executing SQL: {}".format(sql))
+ self._lastSql = sql
+ nRows = self._tdSql.execute(sql)
+ Logging.debug(
+ "[SQL] Execution Result, nRows = {}, SQL = {}".format(
+ nRows, sql))
+ return nRows
+
+ def query(self, sql): # return rows affected
+ if (not self.isOpen):
+ raise RuntimeError(
+ "Cannot query database until connection is open")
+ Logging.debug("[SQL] Executing SQL: {}".format(sql))
+ self._lastSql = sql
+ nRows = self._tdSql.query(sql)
+ Logging.debug(
+ "[SQL] Query Result, nRows = {}, SQL = {}".format(
+ nRows, sql))
+ return nRows
+ # results are in: return self._tdSql.queryResult
+
+ def getQueryResult(self):
+ return self._tdSql.queryResult
+
+ def getResultRows(self):
+ return self._tdSql.queryRows
+
+ def getResultCols(self):
+ return self._tdSql.queryCols
+
+
+class DbManager():
+ ''' This is a wrapper around DbConn(), to make it easier to use.
+
+ TODO: rename this to DbConnManager
+ '''
+ def __init__(self, cType, dbTarget):
+ # self.tableNumQueue = LinearQueue() # TODO: delete?
+ # self.openDbServerConnection()
+ self._dbConn = DbConn.createNative(dbTarget) if (
+ cType == 'native') else DbConn.createRest(dbTarget)
+ try:
+ self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
+ except taos.error.ProgrammingError as err:
+ # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
+ if (err.msg == 'client disconnected'): # cannot open DB connection
+ print(
+ "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
+ sys.exit(2)
+ else:
+ print("Failed to connect to DB, errno = {}, msg: {}"
+ .format(Helper.convertErrno(err.errno), err.msg))
+ raise
+ except BaseException:
+ print("[=] Unexpected exception")
+ raise
+
+ # Do this after dbConn is in proper shape
+ # Moved to Database()
+ # self._stateMachine = StateMechine(self._dbConn)
+
+ def getDbConn(self):
+ return self._dbConn
+
+ # TODO: not used any more, to delete
+ def pickAndAllocateTable(self): # pick any table, and "use" it
+ return self.tableNumQueue.pickAndAllocate()
+
+ # TODO: Not used any more, to delete
+ def addTable(self):
+ with self._lock:
+ tIndex = self.tableNumQueue.push()
+ return tIndex
+
+ # Not used any more, to delete
+ def releaseTable(self, i): # return the table back, so others can use it
+ self.tableNumQueue.release(i)
+
+ # TODO: not used any more, delete
+ def getTableNameToDelete(self):
+ tblNum = self.tableNumQueue.pop() # TODO: race condition!
+ if (not tblNum): # maybe false
+ return False
+
+ return "table_{}".format(tblNum)
+
+ def cleanUp(self):
+ self._dbConn.close()
+
diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/misc.py
new file mode 100644
index 0000000000000000000000000000000000000000..8a2817b3898ac5ca0f5518b95274c0826e1c42b4
--- /dev/null
+++ b/tests/pytest/crash_gen/misc.py
@@ -0,0 +1,175 @@
+import threading
+import random
+import logging
+import os
+
+
+class CrashGenError(Exception):
+ def __init__(self, msg=None, errno=None):
+ self.msg = msg
+ self.errno = errno
+
+ def __str__(self):
+ return self.msg
+
+
+class LoggingFilter(logging.Filter):
+ def filter(self, record: logging.LogRecord):
+ if (record.levelno >= logging.INFO):
+ return True # info or above always log
+
+ # Commenting out below to adjust...
+
+ # if msg.startswith("[TRD]"):
+ # return False
+ return True
+
+
+class MyLoggingAdapter(logging.LoggerAdapter):
+ def process(self, msg, kwargs):
+ return "[{}] {}".format(threading.get_ident() % 10000, msg), kwargs
+ # return '[%s] %s' % (self.extra['connid'], msg), kwargs
+
+
+class Logging:
+ logger = None
+
+ @classmethod
+ def getLogger(cls):
+ return logger
+
+ @classmethod
+ def clsInit(cls, gConfig): # TODO: refactor away gConfig
+ if cls.logger:
+ return
+
+ # Logging Stuff
+ # global misc.logger
+ _logger = logging.getLogger('CrashGen') # real logger
+ _logger.addFilter(LoggingFilter())
+ ch = logging.StreamHandler()
+ _logger.addHandler(ch)
+
+ # Logging adapter, to be used as a logger
+ print("setting logger variable")
+ # global logger
+ cls.logger = MyLoggingAdapter(_logger, [])
+
+ if (gConfig.debug):
+ cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
+ else:
+ cls.logger.setLevel(logging.INFO)
+
+ @classmethod
+ def info(cls, msg):
+ cls.logger.info(msg)
+
+ @classmethod
+ def debug(cls, msg):
+ cls.logger.debug(msg)
+
+ @classmethod
+ def warning(cls, msg):
+ cls.logger.warning(msg)
+
+ @classmethod
+ def error(cls, msg):
+ cls.logger.error(msg)
+
+class Status:
+ STATUS_STARTING = 1
+ STATUS_RUNNING = 2
+ STATUS_STOPPING = 3
+ STATUS_STOPPED = 4
+
+ def __init__(self, status):
+ self.set(status)
+
+ def __repr__(self):
+ return "[Status: v={}]".format(self._status)
+
+ def set(self, status):
+ self._status = status
+
+ def get(self):
+ return self._status
+
+ def isStarting(self):
+ return self._status == Status.STATUS_STARTING
+
+ def isRunning(self):
+ # return self._thread and self._thread.is_alive()
+ return self._status == Status.STATUS_RUNNING
+
+ def isStopping(self):
+ return self._status == Status.STATUS_STOPPING
+
+ def isStopped(self):
+ return self._status == Status.STATUS_STOPPED
+
+ def isStable(self):
+ return self.isRunning() or self.isStopped()
+
+# Deterministic random number generator
+class Dice():
+ seeded = False # static, uninitialized
+
+ @classmethod
+ def seed(cls, s): # static
+ if (cls.seeded):
+ raise RuntimeError(
+ "Cannot seed the random generator more than once")
+ cls.verifyRNG()
+ random.seed(s)
+ cls.seeded = True # TODO: protect against multi-threading
+
+ @classmethod
+ def verifyRNG(cls): # Verify that the RNG is determinstic
+ random.seed(0)
+ x1 = random.randrange(0, 1000)
+ x2 = random.randrange(0, 1000)
+ x3 = random.randrange(0, 1000)
+ if (x1 != 864 or x2 != 394 or x3 != 776):
+ raise RuntimeError("System RNG is not deterministic")
+
+ @classmethod
+ def throw(cls, stop): # get 0 to stop-1
+ return cls.throwRange(0, stop)
+
+ @classmethod
+ def throwRange(cls, start, stop): # up to stop-1
+ if (not cls.seeded):
+ raise RuntimeError("Cannot throw dice before seeding it")
+ return random.randrange(start, stop)
+
+ @classmethod
+ def choice(cls, cList):
+ return random.choice(cList)
+
+class Helper:
+ @classmethod
+ def convertErrno(cls, errno):
+ return errno if (errno > 0) else 0x80000000 + errno
+
+ @classmethod
+ def getFriendlyPath(cls, path): # returns .../xxx/yyy
+ ht1 = os.path.split(path)
+ ht2 = os.path.split(ht1[0])
+ return ".../" + ht2[1] + '/' + ht1[1]
+
+
+class Progress:
+ STEP_BOUNDARY = 0
+ BEGIN_THREAD_STEP = 1
+ END_THREAD_STEP = 2
+ SERVICE_HEART_BEAT= 3
+ tokens = {
+ STEP_BOUNDARY: '.',
+ BEGIN_THREAD_STEP: '[',
+ END_THREAD_STEP: '] ',
+ SERVICE_HEART_BEAT: '.Y.'
+ }
+
+ @classmethod
+ def emit(cls, token):
+ print(cls.tokens[token], end="", flush=True)
diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py
new file mode 100644
index 0000000000000000000000000000000000000000..bb2becb55bd317c9d57152c89bd0ad6c3994de33
--- /dev/null
+++ b/tests/pytest/crash_gen/service_manager.py
@@ -0,0 +1,729 @@
+import os
+import io
+import sys
+import threading
+import signal
+import logging
+import time
+import subprocess
+
+from typing import IO, List
+
+try:
+ import psutil
+except:
+ print("Psutil module needed, please install: sudo pip3 install psutil")
+ sys.exit(-1)
+
+from queue import Queue, Empty
+
+from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
+from .db import DbConn, DbTarget
+
+class TdeInstance():
+ """
+ A class to capture the *static* information of a TDengine instance,
+ including the location of the various files/directories, and basica
+ configuration.
+ """
+
+ @classmethod
+ def _getBuildPath(cls):
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+ if ("community" in selfPath):
+ projPath = selfPath[:selfPath.find("communit")]
+ else:
+ projPath = selfPath[:selfPath.find("tests")]
+
+ buildPath = None
+ for root, dirs, files in os.walk(projPath):
+ if ("taosd" in files):
+ rootRealPath = os.path.dirname(os.path.realpath(root))
+ if ("packaging" not in rootRealPath):
+ buildPath = root[:len(root) - len("/build/bin")]
+ break
+ if buildPath == None:
+ raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
+ .format(selfPath, projPath))
+ return buildPath
+
+ def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030):
+ self._buildDir = self._getBuildPath()
+ self._subdir = '/' + subdir # TODO: tolerate "/"
+ self._port = port # TODO: support different IP address too
+ self._fepPort = fepPort
+
+ self._tInstNum = tInstNum
+ self._smThread = ServiceManagerThread()
+
+ def getDbTarget(self):
+ return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
+
+ def getPort(self):
+ return self._port
+
+ def __repr__(self):
+ return "[TdeInstance: {}, subdir={}]".format(
+ self._buildDir, Helper.getFriendlyPath(self._subdir))
+
+ def generateCfgFile(self):
+ # print("Logger = {}".format(logger))
+ # buildPath = self.getBuildPath()
+ # taosdPath = self._buildPath + "/build/bin/taosd"
+
+ cfgDir = self.getCfgDir()
+ cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed
+ if os.path.exists(cfgFile):
+ if os.path.isfile(cfgFile):
+ Logging.warning("Config file exists already, skip creation: {}".format(cfgFile))
+ return # cfg file already exists, nothing to do
+ else:
+ raise CrashGenError("Invalid config file: {}".format(cfgFile))
+ # Now that the cfg file doesn't exist
+ if os.path.exists(cfgDir):
+ if not os.path.isdir(cfgDir):
+ raise CrashGenError("Invalid config dir: {}".format(cfgDir))
+ # else: good path
+ else:
+ os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
+ # Now we have a good cfg dir
+ cfgValues = {
+ 'runDir': self.getRunDir(),
+ 'ip': '127.0.0.1', # TODO: change to a network addressable ip
+ 'port': self._port,
+ 'fepPort': self._fepPort,
+ }
+ cfgTemplate = """
+dataDir {runDir}/data
+logDir {runDir}/log
+
+charset UTF-8
+
+firstEp {ip}:{fepPort}
+fqdn {ip}
+serverPort {port}
+
+# was all 135 below
+dDebugFlag 135
+cDebugFlag 135
+rpcDebugFlag 135
+qDebugFlag 135
+# httpDebugFlag 143
+# asyncLog 0
+# tables 10
+maxtablesPerVnode 10
+rpcMaxTime 101
+# cache 2
+keep 36500
+# walLevel 2
+walLevel 1
+#
+# maxConnections 100
+"""
+ cfgContent = cfgTemplate.format_map(cfgValues)
+ f = open(cfgFile, "w")
+ f.write(cfgContent)
+ f.close()
+
+ def rotateLogs(self):
+ logPath = self.getLogDir()
+ # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
+ if os.path.exists(logPath):
+ logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
+ Logging.info("Saving old log files to: {}".format(logPathSaved))
+ os.rename(logPath, logPathSaved)
+ # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
+
+
+ def getExecFile(self): # .../taosd
+ return self._buildDir + "/build/bin/taosd"
+
+ def getRunDir(self): # TODO: rename to "root dir" ?!
+ return self._buildDir + self._subdir
+
+ def getCfgDir(self): # path, not file
+ return self.getRunDir() + "/cfg"
+
+ def getLogDir(self):
+ return self.getRunDir() + "/log"
+
+ def getHostAddr(self):
+ return "127.0.0.1"
+
+ def getServiceCmdLine(self): # to start the instance
+ return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
+
+ def _getDnodes(self, dbc):
+ dbc.query("show dnodes")
+ cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason
+ return {c[1]:c[4] for c in cols} # {'xxx:6030':'ready', 'xxx:6130':'ready'}
+
+ def createDnode(self, dbt: DbTarget):
+ """
+ With a connection to the "first" EP, let's create a dnode for someone else who
+ wants to join.
+ """
+ dbc = DbConn.createNative(self.getDbTarget())
+ dbc.open()
+
+ if dbt.getEp() in self._getDnodes(dbc):
+ Logging.info("Skipping DNode creation for: {}".format(dbt))
+ dbc.close()
+ return
+
+ sql = "CREATE DNODE \"{}\"".format(dbt.getEp())
+ dbc.execute(sql)
+ dbc.close()
+
+ def getStatus(self):
+ return self._smThread.getStatus()
+
+ def getSmThread(self):
+ return self._smThread
+
+ def start(self):
+ if not self.getStatus().isStopped():
+ raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))
+
+ Logging.info("Starting TDengine instance: {}".format(self))
+ self.generateCfgFile() # service side generates config file, client does not
+ self.rotateLogs()
+
+ self._smThread.start(self.getServiceCmdLine())
+
+ def stop(self):
+ self._smThread.stop()
+
+ def isFirst(self):
+ return self._tInstNum == 0
+
+
+class TdeSubProcess:
+ """
+ A class to to represent the actual sub process that is the run-time
+ of a TDengine instance.
+
+ It takes a TdeInstance object as its parameter, with the rationale being
+ "a sub process runs an instance".
+ """
+
+ # RET_ALREADY_STOPPED = -1
+ # RET_TIME_OUT = -3
+ # RET_SUCCESS = -4
+
+ def __init__(self):
+ self.subProcess = None
+ # if tInst is None:
+ # raise CrashGenError("Empty instance not allowed in TdeSubProcess")
+ # self._tInst = tInst # Default create at ServiceManagerThread
+
+ def getStdOut(self):
+ return self.subProcess.stdout
+
+ def getStdErr(self):
+ return self.subProcess.stderr
+
+ def isRunning(self):
+ return self.subProcess is not None
+
+ def getPid(self):
+ return self.subProcess.pid
+
+ def start(self, cmdLine):
+ ON_POSIX = 'posix' in sys.builtin_module_names
+
+ # Sanity check
+ if self.subProcess: # already there
+ raise RuntimeError("Corrupt process state")
+
+ self.subProcess = subprocess.Popen(
+ cmdLine,
+ shell=False,
+ # svcCmdSingle, shell=True, # capture core dump?
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ # bufsize=1, # not supported in binary mode
+ close_fds=ON_POSIX
+ ) # had text=True, which interferred with reading EOF
+
+ def stop(self):
+ """
+ Stop a sub process, and try to return a meaningful return code.
+
+ Common POSIX signal values (from man -7 signal):
+ SIGHUP 1
+ SIGINT 2
+ SIGQUIT 3
+ SIGILL 4
+ SIGTRAP 5
+ SIGABRT 6
+ SIGIOT 6
+ SIGBUS 7
+ SIGEMT -
+ SIGFPE 8
+ SIGKILL 9
+ SIGUSR1 10
+ SIGSEGV 11
+ SIGUSR2 12
+ """
+ if not self.subProcess:
+ print("Sub process already stopped")
+ return # -1
+
+ retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
+ if retCode: # valid return code, process ended
+ retCode = -retCode # only if valid
+ Logging.warning("TSP.stop(): process ended itself")
+ self.subProcess = None
+ return retCode
+
+ # process still alive, let's interrupt it
+ print("Terminate running process, send SIG_INT and wait...")
+ # sub process should end, then IPC queue should end, causing IO thread to end
+ self.subProcess.send_signal(signal.SIGINT)
+ self.subProcess.wait(20)
+ retCode = self.subProcess.returncode # should always be there
+ # May throw subprocess.TimeoutExpired exception above, therefore
+ # The process is guranteed to have ended by now
+ self.subProcess = None
+ if retCode != 0: # != (- signal.SIGINT):
+ Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG_INT, retCode={}".format(retCode))
+ else:
+ Logging.info("TSP.stop(): sub proc successfully terminated with SIG_INT")
+ return - retCode
+
+class ServiceManager:
+ PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
+
+ def __init__(self, numDnodes): # >1 when we run a cluster
+ Logging.info("TDengine Service Manager (TSM) created")
+ self._numDnodes = numDnodes # >1 means we have a cluster
+ self._lock = threading.Lock()
+ # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
+ # signal.signal(signal.SIGINT, self.sigIntHandler)
+ # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
+
+ self.inSigHandler = False
+ # self._status = MainExec.STATUS_RUNNING # set inside
+ # _startTaosService()
+ self._runCluster = (numDnodes > 1)
+ self._tInsts : List[TdeInstance] = []
+ for i in range(0, numDnodes):
+ ti = self._createTdeInstance(i) # construct tInst
+ self._tInsts.append(ti)
+
+ # self.svcMgrThreads : List[ServiceManagerThread] = []
+ # for i in range(0, numDnodes):
+ # thread = self._createThread(i) # construct tInst
+ # self.svcMgrThreads.append(thread)
+
+ def _createTdeInstance(self, dnIndex):
+ if not self._runCluster: # single instance
+ subdir = 'test'
+ else: # Create all threads in a cluster
+ subdir = 'cluster_dnode_{}'.format(dnIndex)
+ fepPort= 6030 # firstEP Port
+ port = fepPort + dnIndex * 100
+ return TdeInstance(subdir, dnIndex, port, fepPort)
+ # return ServiceManagerThread(dnIndex, ti)
+
+ def _doMenu(self):
+ choice = ""
+ while True:
+ print("\nInterrupting Service Program, Choose an Action: ")
+ print("1: Resume")
+ print("2: Terminate")
+ print("3: Restart")
+ # Remember to update the if range below
+ # print("Enter Choice: ", end="", flush=True)
+ while choice == "":
+ choice = input("Enter Choice: ")
+ if choice != "":
+ break # done with reading repeated input
+ if choice in ["1", "2", "3"]:
+ break # we are done with whole method
+ print("Invalid choice, please try again.")
+ choice = "" # reset
+ return choice
+
+ def sigUsrHandler(self, signalNumber, frame):
+ print("Interrupting main thread execution upon SIGUSR1")
+ if self.inSigHandler: # already
+ print("Ignoring repeated SIG...")
+ return # do nothing if it's already not running
+ self.inSigHandler = True
+
+ choice = self._doMenu()
+ if choice == "1":
+ self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
+ elif choice == "2":
+ self.stopTaosServices()
+ elif choice == "3": # Restart
+ self.restart()
+ else:
+ raise RuntimeError("Invalid menu choice: {}".format(choice))
+
+ self.inSigHandler = False
+
+ def sigIntHandler(self, signalNumber, frame):
+ print("ServiceManager: INT Signal Handler starting...")
+ if self.inSigHandler:
+ print("Ignoring repeated SIG_INT...")
+ return
+ self.inSigHandler = True
+
+ self.stopTaosServices()
+ print("ServiceManager: INT Signal Handler returning...")
+ self.inSigHandler = False
+
+ def sigHandlerResume(self):
+ print("Resuming TDengine service manager (main thread)...\n\n")
+
+ # def _updateThreadStatus(self):
+ # if self.svcMgrThread: # valid svc mgr thread
+ # if self.svcMgrThread.isStopped(): # done?
+ # self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
+ # self.svcMgrThread = None # no more
+
+ def isActive(self):
+ """
+ Determine if the service/cluster is active at all, i.e. at least
+ one thread is not "stopped".
+ """
+ for ti in self._tInsts:
+ if not ti.getStatus().isStopped():
+ return True
+ return False
+
+ # def isRestarting(self):
+ # """
+ # Determine if the service/cluster is being "restarted", i.e., at least
+ # one thread is in "restarting" status
+ # """
+ # for thread in self.svcMgrThreads:
+ # if thread.isRestarting():
+ # return True
+ # return False
+
+ def isStable(self):
+ """
+ Determine if the service/cluster is "stable", i.e. all of the
+ threads are in "stable" status.
+ """
+ for ti in self._tInsts:
+ if not ti.getStatus().isStable():
+ return False
+ return True
+
+ def _procIpcAll(self):
+ while self.isActive():
+ Progress.emit(Progress.SERVICE_HEART_BEAT)
+ for ti in self._tInsts: # all thread objects should always be valid
+ # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
+ status = ti.getStatus()
+ if status.isRunning():
+ th = ti.getSmThread()
+ th.procIpcBatch() # regular processing,
+ if status.isStopped():
+ th.procIpcBatch() # one last time?
+ # self._updateThreadStatus()
+
+ time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
+ # raise CrashGenError("dummy")
+ print("Service Manager Thread (with subprocess) ended, main thread exiting...")
+
+ def _getFirstInstance(self):
+ return self._tInsts[0]
+
+ def startTaosServices(self):
+ with self._lock:
+ if self.isActive():
+ raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running")
+
+ # Find if there's already a taosd service, and then kill it
+ for proc in psutil.process_iter():
+ if proc.name() == 'taosd':
+ print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
+ time.sleep(2.0)
+ proc.kill()
+ # print("Process: {}".format(proc.name()))
+
+ # self.svcMgrThread = ServiceManagerThread() # create the object
+
+ for ti in self._tInsts:
+ ti.start()
+ if not ti.isFirst():
+ tFirst = self._getFirstInstance()
+ tFirst.createDnode(ti.getDbTarget())
+ ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
+
+ def stopTaosServices(self):
+ with self._lock:
+ if not self.isActive():
+ Logging.warning("Cannot stop TAOS service(s), already not active")
+ return
+
+ for ti in self._tInsts:
+ ti.stop()
+
+ def run(self):
+ self.startTaosServices()
+ self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
+ if self.isActive(): # if sig handler hasn't destroyed it by now
+ self.stopTaosServices() # should have started already
+
+ def restart(self):
+ if not self.isStable():
+ Logging.warning("Cannot restart service/cluster, when not stable")
+ return
+
+ # self._isRestarting = True
+ if self.isActive():
+ self.stopTaosServices()
+ else:
+ Logging.warning("Service not active when restart requested")
+
+ self.startTaosServices()
+ # self._isRestarting = False
+
+ # def isRunning(self):
+ # return self.svcMgrThread != None
+
+ # def isRestarting(self):
+ # return self._isRestarting
+
+class ServiceManagerThread:
+ """
+ A class representing a dedicated thread which manages the "sub process"
+ of the TDengine service, interacting with its STDOUT/ERR.
+
+ It takes a TdeInstance parameter at creation time, or create a default
+ """
+ MAX_QUEUE_SIZE = 10000
+
+ def __init__(self):
+ # Set the sub process
+ self._tdeSubProcess = None # type: TdeSubProcess
+
+ # Arrange the TDengine instance
+ # self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
+ # self._tInst = tInst or TdeInstance() # Need an instance
+
+ self._thread = None # The actual thread, # type: threading.Thread
+ self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
+
+ def __repr__(self):
+ return "[SvcMgrThread: status={}, subProc={}]".format(
+ self.getStatus(), self._tdeSubProcess)
+
+ def getStatus(self):
+ return self._status
+
+ # Start the thread (with sub process), and wait for the sub service
+ # to become fully operational
+ def start(self, cmdLine):
+ if self._thread:
+ raise RuntimeError("Unexpected _thread")
+ if self._tdeSubProcess:
+ raise RuntimeError("TDengine sub process already created/running")
+
+ Logging.info("Attempting to start TAOS service: {}".format(self))
+
+ self._status.set(Status.STATUS_STARTING)
+ self._tdeSubProcess = TdeSubProcess()
+ self._tdeSubProcess.start(cmdLine)
+
+ self._ipcQueue = Queue()
+ self._thread = threading.Thread( # First thread captures server OUTPUT
+ target=self.svcOutputReader,
+ args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
+ self._thread.daemon = True # thread dies with the program
+ self._thread.start()
+
+ self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
+ target=self.svcErrorReader,
+ args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
+ self._thread2.daemon = True # thread dies with the program
+ self._thread2.start()
+
+ # wait for service to start
+ for i in range(0, 100):
+ time.sleep(1.0)
+ # self.procIpcBatch() # don't pump message during start up
+ print("_zz_", end="", flush=True)
+ if self._status.isRunning():
+ Logging.info("[] TDengine service READY to process requests")
+ Logging.info("[] TAOS service started: {}".format(self))
+ # self._verifyDnode(self._tInst) # query and ensure dnode is ready
+ # Logging.debug("[] TAOS Dnode verified: {}".format(self))
+ return # now we've started
+ # TODO: handle failure-to-start better?
+ self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
+ raise RuntimeError("TDengine service did not start successfully: {}".format(self))
+
+ def _verifyDnode(self, tInst: TdeInstance):
+ dbc = DbConn.createNative(tInst.getDbTarget())
+ dbc.open()
+ dbc.query("show dnodes")
+ # dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
+ cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason
+ # ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
+ isValid = False
+ for col in cols:
+ # print("col = {}".format(col))
+ ep = col[1].split(':') # 10.1.30.2:6030
+ print("Found ep={}".format(ep))
+ if tInst.getPort() == int(ep[1]): # That's us
+ # print("Valid Dnode matched!")
+ isValid = True # now we are valid
+ break
+ if not isValid:
+ print("Failed to start dnode, sleep for a while")
+ time.sleep(600)
+ raise RuntimeError("Failed to start Dnode, expected port not found: {}".
+ format(tInst.getPort()))
+ dbc.close()
+
+ def stop(self):
+ # can be called from both main thread or signal handler
+ print("Terminating TDengine service running as the sub process...")
+ if self.getStatus().isStopped():
+ print("Service already stopped")
+ return
+ if self.getStatus().isStopping():
+ print("Service is already being stopped")
+ return
+ # Linux will send Control-C generated SIGINT to the TDengine process
+ # already, ref:
+ # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
+ if not self._tdeSubProcess:
+ raise RuntimeError("sub process object missing")
+
+ self._status.set(Status.STATUS_STOPPING)
+ # retCode = self._tdeSubProcess.stop()
+ try:
+ retCode = self._tdeSubProcess.stop()
+ # print("Attempted to stop sub process, got return code: {}".format(retCode))
+ if retCode == signal.SIGSEGV : # SGV
+ Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
+ except subprocess.TimeoutExpired as err:
+ print("Time out waiting for TDengine service process to exit")
+ else:
+ if self._tdeSubProcess.isRunning(): # still running, should now never happen
+ print("FAILED to stop sub process, it is still running... pid = {}".format(
+ self._tdeSubProcess.getPid()))
+ else:
+ self._tdeSubProcess = None # not running any more
+ self.join() # stop the thread, change the status, etc.
+
+ # Check if it's really stopped
+ outputLines = 10 # for last output
+ if self.getStatus().isStopped():
+ self.procIpcBatch(outputLines) # one last time
+ Logging.debug("End of TDengine Service Output: {}".format(self))
+ Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
+ else:
+ print("WARNING: SMT did not terminate as expected: {}".format(self))
+
+ def join(self):
+ # TODO: sanity check
+ if not self.getStatus().isStopping():
+ raise RuntimeError(
+ "SMT.Join(): Unexpected status: {}".format(self._status))
+
+ if self._thread:
+ self._thread.join()
+ self._thread = None
+ self._status.set(Status.STATUS_STOPPED)
+ # STD ERR thread
+ self._thread2.join()
+ self._thread2 = None
+ else:
+ print("Joining empty thread, doing nothing")
+
+ def _trimQueue(self, targetSize):
+ if targetSize <= 0:
+ return # do nothing
+ q = self._ipcQueue
+ if (q.qsize() <= targetSize): # no need to trim
+ return
+
+ Logging.debug("Triming IPC queue to target size: {}".format(targetSize))
+ itemsToTrim = q.qsize() - targetSize
+ for i in range(0, itemsToTrim):
+ try:
+ q.get_nowait()
+ except Empty:
+ break # break out of for loop, no more trimming
+
+ TD_READY_MSG = "TDengine is initialized successfully"
+
+ def procIpcBatch(self, trimToTarget=0, forceOutput=False):
+ self._trimQueue(trimToTarget) # trim if necessary
+ # Process all the output generated by the underlying sub process,
+ # managed by IO thread
+ print("<", end="", flush=True)
+ while True:
+ try:
+ line = self._ipcQueue.get_nowait() # getting output at fast speed
+ self._printProgress("_o")
+ except Empty:
+ # time.sleep(2.3) # wait only if there's no output
+ # no more output
+ print(".>", end="", flush=True)
+ return # we are done with THIS BATCH
+ else: # got line, printing out
+ if forceOutput:
+ Logging.info(line)
+ else:
+ Logging.debug(line)
+ print(">", end="", flush=True)
+
+ _ProgressBars = ["--", "//", "||", "\\\\"]
+
+ def _printProgress(self, msg): # TODO: assuming 2 chars
+ print(msg, end="", flush=True)
+ pBar = self._ProgressBars[Dice.throw(4)]
+ print(pBar, end="", flush=True)
+ print('\b\b\b\b', end="", flush=True)
+
+ def svcOutputReader(self, out: IO, queue):
+ # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
+ # print("This is the svcOutput Reader...")
+ # for line in out :
+ for line in iter(out.readline, b''):
+ # print("Finished reading a line: {}".format(line))
+ # print("Adding item to queue...")
+ try:
+ line = line.decode("utf-8").rstrip()
+ except UnicodeError:
+ print("\nNon-UTF8 server output: {}\n".format(line))
+
+ # This might block, and then causing "out" buffer to block
+ queue.put(line)
+ self._printProgress("_i")
+
+ if self._status.isStarting(): # we are starting, let's see if we have started
+ if line.find(self.TD_READY_MSG) != -1: # found
+ Logging.info("Waiting for the service to become FULLY READY")
+ time.sleep(1.0) # wait for the server to truly start. TODO: remove this
+ Logging.info("Service is now FULLY READY") # TODO: more ID info here?
+ self._status.set(Status.STATUS_RUNNING)
+
+ # Trim the queue if necessary: TODO: try this 1 out of 10 times
+ self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
+
+ if self._status.isStopping(): # TODO: use thread status instead
+ # WAITING for stopping sub process to finish its outptu
+ print("_w", end="", flush=True)
+
+ # queue.put(line)
+ # meaning sub process must have died
+ Logging.info("\nEnd of stream detected for TDengine STDOUT: {}".format(self))
+ out.close()
+
+ def svcErrorReader(self, err: IO, queue):
+ for line in iter(err.readline, b''):
+ print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
+ Logging.info("\nEnd of stream detected for TDengine STDERR: {}".format(self))
+ err.close()
\ No newline at end of file
diff --git a/tests/pytest/crash_gen_bootstrap.py b/tests/pytest/crash_gen_bootstrap.py
new file mode 100644
index 0000000000000000000000000000000000000000..a3417d21a85ec5ea26c7ebc22ffe398fc436eebe
--- /dev/null
+++ b/tests/pytest/crash_gen_bootstrap.py
@@ -0,0 +1,23 @@
+# -----!/usr/bin/python3.7
+###################################################################
+# Copyright (c) 2016 by TAOS Technologies, Inc.
+# All rights reserved.
+#
+# This file is proprietary and confidential to TAOS Technologies.
+# No part of this file may be reproduced, stored, transmitted,
+# disclosed or used in any form or by any means other than as
+# expressly provided by the written permission from Jianhui Tao
+#
+###################################################################
+
+import sys
+from crash_gen.crash_gen import MainExec
+
+if __name__ == "__main__":
+
+ mExec = MainExec()
+ mExec.init()
+ exitCode = mExec.run()
+
+ print("Exiting with code: {}".format(exitCode))
+ sys.exit(exitCode)
diff --git a/tests/pytest/query/querySort.py b/tests/pytest/query/querySort.py
index e5d3c8ce1f4eb9c1d2003bd659771562c9ea14e5..649e0dc1cb3191ba08b3f2da0a5edee3afc66575 100644
--- a/tests/pytest/query/querySort.py
+++ b/tests/pytest/query/querySort.py
@@ -96,6 +96,12 @@ class TDTestCase:
tdSql.query("select * from st order by ts desc")
self.checkColumnSorted(0, "desc")
+ print("======= step 2: verify order for special column =========")
+
+ tdSql.query("select tbcol1 from st order by ts desc")
+
+ tdSql.query("select tbcol6 from st order by ts desc")
+
for i in range(1, 10):
tdSql.error("select * from st order by tbcol%d" % i)
tdSql.error("select * from st order by tbcol%d asc" % i)