diff --git a/tests/pytest/crash_gen/README.md b/tests/pytest/crash_gen/README.md new file mode 100644 index 0000000000000000000000000000000000000000..6788ab1a63d0a7c515558695605d1ec8ac5fb7f9 --- /dev/null +++ b/tests/pytest/crash_gen/README.md @@ -0,0 +1,130 @@ +

User's Guide to the Crash_Gen Tool

+ +# Introduction + +To effectively test and debug our TDengine product, we have developed a simple tool to +exercise various functions of the system in a randomized fashion, hoping to expose +maximum number of problems, hopefully without a pre-determined scenario. + +# Preparation + +To run this tool, please ensure the followed preparation work is done first. + +1. Fetch a copy of the TDengine source code, and build it successfully in the `build/` + directory +1. Ensure that the system has Python3.8 or above properly installed. We use + Ubuntu 20.04LTS as our own development environment, and suggest you also use such + an environment if possible. + +# Simple Execution + +To run the tool with the simplest method, follow the steps below: + +1. Open a terminal window, start the `taosd` service in the `build/` directory + (or however you prefer to start the `taosd` service) +1. Open another terminal window, go into the `tests/pytest/` directory, and + run `./crash_gen.sh -p -t 3 -s 10` (change the two parameters here as you wish) +1. Watch the output to the end and see if you get a `SUCCESS` or `FAILURE` + +That's it! + +# Running Clusters + +This tool also makes it easy to test/verify the clustering capabilities of TDengine. You +can start a cluster quite easily with the following command: + +``` +$ cd tests/pytest/ +$ ./crash_gen.sh -e -o 3 +``` + +The `-e` option above tells the tool to start the service, and do not run any tests, while +the `-o 3` option tells the tool to start 3 DNodes and join them together in a cluster. +Obviously you can adjust the the number here. + +## Behind the Scenes + +When the tool runs a cluster, it users a number of directories, each holding the information +for a single DNode, see: + +``` +$ ls build/cluster* +build/cluster_dnode_0: +cfg data log + +build/cluster_dnode_1: +cfg data log + +build/cluster_dnode_2: +cfg data log +``` + +Therefore, when something goes wrong and you want to reset everything with the cluster, simple +erase all the files: + +``` +$ rm -rf build/cluster_dnode_* +``` + +## Addresses and Ports + +The DNodes in the cluster all binds the the `127.0.0.1` IP address (for now anyway), and +uses port 6030 for the first DNode, and 6130 for the 2nd one, and so on. + +## Testing Against a Cluster + +In a separate terminal window, you can invoke the tool in client mode and test against +a cluster, such as: + +``` +$ ./crash_gen.sh -p -t 10 -s 100 -i 3 +``` + +Here the `-i` option tells the tool to always create tables with 3 replicas, and run +all tests against such tables. + +# Additional Features + +The exhaustive features of the tool is available through the `-h` option: + +``` +$ ./crash_gen.sh -h +usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS] [-i MAX_REPLICAS] [-l] [-n] [-o NUM_DNODES] [-p] [-r] + [-s MAX_STEPS] [-t NUM_THREADS] [-v] [-x] + +TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below) +--------------------------------------------------------------------- +1. You build TDengine in the top level ./build directory, as described in offical docs +2. You run the server there before this script: ./build/bin/taosd -c test/cfg + +optional arguments: + -h, --help show this help message and exit + -a, --auto-start-service + Automatically start/stop the TDengine service (default: false) + -b MAX_DBS, --max-dbs MAX_DBS + Maximum number of DBs to keep, set to disable dropping DB. (default: 0) + -c CONNECTOR_TYPE, --connector-type CONNECTOR_TYPE + Connector type to use: native, rest, or mixed (default: 10) + -d, --debug Turn on DEBUG mode for more logging (default: false) + -e, --run-tdengine Run TDengine service in foreground (default: false) + -g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS + Ignore error codes, comma separated, 0x supported (default: None) + -i MAX_REPLICAS, --max-replicas MAX_REPLICAS + Maximum number of replicas to use, when testing against clusters. (default: 1) + -l, --larger-data Write larger amount of data during write operations (default: false) + -n, --dynamic-db-table-names + Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false) + -o NUM_DNODES, --num-dnodes NUM_DNODES + Number of Dnodes to initialize, used with -e option. (default: 1) + -p, --per-thread-db-connection + Use a single shared db connection (default: false) + -r, --record-ops Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false) + -s MAX_STEPS, --max-steps MAX_STEPS + Maximum number of steps to run (default: 100) + -t NUM_THREADS, --num-threads NUM_THREADS + Number of threads to run (default: 10) + -v, --verify-data Verify data written in a number of places by reading back (default: false) + -x, --continue-on-exception + Continue execution after encountering unexpected/disallowed errors/exceptions (default: false) +``` + diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen.py index 2d52d274c3bcf008d1c430bacdaa7523de72fea9..74e3964d5a98420c1353e35f5043160cd4d60fa5 100755 --- a/tests/pytest/crash_gen/crash_gen.py +++ b/tests/pytest/crash_gen/crash_gen.py @@ -18,6 +18,7 @@ from __future__ import annotations from typing import Set from typing import Dict from typing import List +from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none import textwrap import time @@ -62,9 +63,10 @@ gContainer: Container class WorkerThread: - def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, - # te: TaskExecutor, - ): # note: main thread context! + def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator): + """ + Note: this runs in the main thread context + """ # self._curStep = -1 self._pool = pool self._tid = tid @@ -1007,6 +1009,8 @@ class Database: possibly in a cluster environment. For now we use it to manage state transitions in that database + + TODO: consider moving, but keep in mind it contains "StateMachine" ''' _clsLock = threading.Lock() # class wide lock _lastInt = 101 # next one is initial integer @@ -1182,7 +1186,7 @@ class Task(): def __init__(self, execStats: ExecutionStats, db: Database): self._workerThread = None - self._err = None # type: Exception + self._err: Optional[Exception] = None self._aborted = False self._curStep = None self._numRows = None # Number of rows affected @@ -1318,10 +1322,11 @@ class Task(): self._aborted = True traceback.print_exc() except BaseException: # TODO: what is this again??!! - self.logDebug( - "[=] Unexpected exception, SQL: {}".format( - wt.getDbConn().getLastSql())) - raise + raise RuntimeError("Punt") + # self.logDebug( + # "[=] Unexpected exception, SQL: {}".format( + # wt.getDbConn().getLastSql())) + # raise self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) self.logDebug("[X] task execution completed, {}, status: {}".format( @@ -1498,7 +1503,8 @@ class TaskCreateDb(StateTransitionTask): # was: self.execWtSql(wt, "create database db") repStr = "" if gConfig.max_replicas != 1: - numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N + # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N + numReplica = gConfig.max_replicas # fixed, always repStr = "replica {}".format(numReplica) self.execWtSql(wt, "create database {} {}" .format(self._db.getName(), repStr) ) @@ -2050,7 +2056,7 @@ class ClientManager: class MainExec: def __init__(self): self._clientMgr = None - self._svcMgr = None + self._svcMgr = None # type: ServiceManager signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler) @@ -2063,17 +2069,16 @@ class MainExec: self._svcMgr.sigUsrHandler(signalNumber, frame) def sigIntHandler(self, signalNumber, frame): - if self._svcMgr: + if self._svcMgr: self._svcMgr.sigIntHandler(signalNumber, frame) - if self._clientMgr: + if self._clientMgr: self._clientMgr.sigIntHandler(signalNumber, frame) def runClient(self): global gSvcMgr if gConfig.auto_start_service: - self._svcMgr = ServiceManager() - gSvcMgr = self._svcMgr # hack alert - self._svcMgr.startTaosService() # we start, don't run + gSvcMgr = self._svcMgr = ServiceManager() # hack alert + gSvcMgr.startTaosService() # we start, don't run self._clientMgr = ClientManager() ret = None @@ -2086,12 +2091,10 @@ class MainExec: def runService(self): global gSvcMgr - self._svcMgr = ServiceManager() - gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert + gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert - self._svcMgr.run() # run to some end state - self._svcMgr = None - gSvcMgr = None + gSvcMgr.run() # run to some end state + gSvcMgr = self._svcMgr = None def init(self): # TODO: refactor global gContainer @@ -2165,6 +2168,13 @@ class MainExec: '--dynamic-db-table-names', action='store_true', help='Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)') + parser.add_argument( + '-o', + '--num-dnodes', + action='store', + default=1, + type=int, + help='Number of Dnodes to initialize, used with -e option. (default: 1)') parser.add_argument( '-p', '--per-thread-db-connection', @@ -2209,7 +2219,12 @@ class MainExec: def run(self): if gConfig.run_tdengine: # run server - self.runService() + try: + self.runService() + return 0 # success + except ConnectionError as err: + Logging.error("Failed to make DB connection, please check DB instance manually") + return -1 # failure else: return self.runClient() diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/db.py index 5404382bf0e344f3080ded1bc9b1a688d9845931..43c855647c03d1de3e55393eb85c77250a00a602 100644 --- a/tests/pytest/crash_gen/db.py +++ b/tests/pytest/crash_gen/db.py @@ -12,7 +12,9 @@ from util.cases import * from util.dnodes import * from util.log import * -from .misc import Logging, CrashGenError, Helper +from .misc import Logging, CrashGenError, Helper, Dice +import os +import datetime # from .service_manager import TdeInstance class DbConn: @@ -44,6 +46,9 @@ class DbConn: self._lastSql = None self._dbTarget = dbTarget + def __repr__(self): + return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget) + def getLastSql(self): return self._lastSql @@ -54,7 +59,7 @@ class DbConn: # below implemented by child classes self.openByType() - Logging.debug("[DB] data connection opened, type = {}".format(self._type)) + Logging.debug("[DB] data connection opened: {}".format(self)) self.isOpen = True def close(self): @@ -277,15 +282,18 @@ class DbTarget: self.cfgPath = cfgPath self.hostAddr = hostAddr self.port = port - + def __repr__(self): return "[DbTarget: cfgPath={}, host={}:{}]".format( - self.cfgPath, self.hostAddr, self.port) + Helper.getFriendlyPath(self.cfgPath), self.hostAddr, self.port) + + def getEp(self): + return "{}:{}".format(self.hostAddr, self.port) class DbConnNative(DbConn): # Class variables _lock = threading.Lock() - _connInfoDisplayed = False + # _connInfoDisplayed = False # TODO: find another way to display this totalConnections = 0 # Not private def __init__(self, dbTarget): @@ -304,9 +312,9 @@ class DbConnNative(DbConn): cls = self.__class__ # Get the class, to access class variables with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! dbTarget = self._dbTarget - if not cls._connInfoDisplayed: - cls._connInfoDisplayed = True # updating CLASS variable - Logging.info("Initiating TAOS native connection to {}".format(dbTarget)) + # if not cls._connInfoDisplayed: + # cls._connInfoDisplayed = True # updating CLASS variable + Logging.debug("Initiating TAOS native connection to {}".format(dbTarget)) # Make the connection # self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable # self._cursor = self._conn.cursor() @@ -424,3 +432,4 @@ class DbManager(): def cleanUp(self): self._dbConn.close() + diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/misc.py index 08e50e5070025f568cb5f65091d527732e8c4d28..8a2817b3898ac5ca0f5518b95274c0826e1c42b4 100644 --- a/tests/pytest/crash_gen/misc.py +++ b/tests/pytest/crash_gen/misc.py @@ -1,6 +1,7 @@ import threading import random import logging +import os class CrashGenError(Exception): @@ -26,7 +27,7 @@ class LoggingFilter(logging.Filter): class MyLoggingAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): - return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs + return "[{}] {}".format(threading.get_ident() % 10000, msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs @@ -71,12 +72,44 @@ class Logging: def warning(cls, msg): cls.logger.warning(msg) + @classmethod + def error(cls, msg): + cls.logger.error(msg) + class Status: STATUS_STARTING = 1 STATUS_RUNNING = 2 STATUS_STOPPING = 3 STATUS_STOPPED = 4 + def __init__(self, status): + self.set(status) + + def __repr__(self): + return "[Status: v={}]".format(self._status) + + def set(self, status): + self._status = status + + def get(self): + return self._status + + def isStarting(self): + return self._status == Status.STATUS_STARTING + + def isRunning(self): + # return self._thread and self._thread.is_alive() + return self._status == Status.STATUS_RUNNING + + def isStopping(self): + return self._status == Status.STATUS_STOPPING + + def isStopped(self): + return self._status == Status.STATUS_STOPPED + + def isStable(self): + return self.isRunning() or self.isStopped() + # Deterministic random number generator class Dice(): seeded = False # static, uninitialized @@ -118,14 +151,23 @@ class Helper: def convertErrno(cls, errno): return errno if (errno > 0) else 0x80000000 + errno + @classmethod + def getFriendlyPath(cls, path): # returns .../xxx/yyy + ht1 = os.path.split(path) + ht2 = os.path.split(ht1[0]) + return ".../" + ht2[1] + '/' + ht1[1] + + class Progress: STEP_BOUNDARY = 0 BEGIN_THREAD_STEP = 1 END_THREAD_STEP = 2 + SERVICE_HEART_BEAT= 3 tokens = { STEP_BOUNDARY: '.', BEGIN_THREAD_STEP: '[', - END_THREAD_STEP: '] ' + END_THREAD_STEP: '] ', + SERVICE_HEART_BEAT: '.Y.' } @classmethod diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 11e35b6de848e3fb673e79e33f3fc221f3ec8cae..c85f64fde47314003909b8d92830aaa66a4694f1 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -7,7 +7,7 @@ import logging import time import subprocess -from typing import IO +from typing import IO, List try: import psutil @@ -17,7 +17,7 @@ except: from queue import Queue, Empty -from .misc import Logging, Status, CrashGenError, Dice +from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress from .db import DbConn, DbTarget class TdeInstance(): @@ -47,12 +47,15 @@ class TdeInstance(): .format(selfPath, projPath)) return buildPath - def __init__(self, subdir='test', port=6030, fepPort=6030): + def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030): self._buildDir = self._getBuildPath() self._subdir = '/' + subdir # TODO: tolerate "/" self._port = port # TODO: support different IP address too self._fepPort = fepPort + self._tInstNum = tInstNum + self._smThread = ServiceManagerThread() + def getDbTarget(self): return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port) @@ -60,7 +63,8 @@ class TdeInstance(): return self._port def __repr__(self): - return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir) + return "[TdeInstance: {}, subdir={}]".format( + self._buildDir, Helper.getFriendlyPath(self._subdir)) def generateCfgFile(self): # print("Logger = {}".format(logger)) @@ -146,8 +150,52 @@ walLevel 1 def getHostAddr(self): return "127.0.0.1" - def getServiceCommand(self): # to start the instance + def getServiceCmdLine(self): # to start the instance return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() + + def _getDnodes(self, dbc): + dbc.query("show dnodes") + cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason + return {c[1]:c[4] for c in cols} # {'xxx:6030':'ready', 'xxx:6130':'ready'} + + def createDnode(self, dbt: DbTarget): + """ + With a connection to the "first" EP, let's create a dnode for someone else who + wants to join. + """ + dbc = DbConn.createNative(self.getDbTarget()) + dbc.open() + + if dbt.getEp() in self._getDnodes(dbc): + Logging.info("Skipping DNode creation for: {}".format(dbt)) + dbc.close() + return + + sql = "CREATE DNODE \"{}\"".format(dbt.getEp()) + dbc.execute(sql) + dbc.close() + + def getStatus(self): + return self._smThread.getStatus() + + def getSmThread(self): + return self._smThread + + def start(self): + if not self.getStatus().isStopped(): + raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus())) + + Logging.info("Starting TDengine instance: {}".format(self)) + self.generateCfgFile() # service side generates config file, client does not + self.rotateLogs() + + self._smThread.start(self.getServiceCmdLine()) + + def stop(self): + self._smThread.stop() + + def isFirst(self): + return self._tInstNum == 0 class TdeSubProcess: @@ -159,11 +207,15 @@ class TdeSubProcess: "a sub process runs an instance". """ - def __init__(self, tInst : TdeInstance): + # RET_ALREADY_STOPPED = -1 + # RET_TIME_OUT = -3 + # RET_SUCCESS = -4 + + def __init__(self): self.subProcess = None - if tInst is None: - raise CrashGenError("Empty instance not allowed in TdeSubProcess") - self._tInst = tInst # Default create at ServiceManagerThread + # if tInst is None: + # raise CrashGenError("Empty instance not allowed in TdeSubProcess") + # self._tInst = tInst # Default create at ServiceManagerThread def getStdOut(self): return self.subProcess.stdout @@ -177,38 +229,15 @@ class TdeSubProcess: def getPid(self): return self.subProcess.pid - # Repalced by TdeInstance class - # def getBuildPath(self): - # selfPath = os.path.dirname(os.path.realpath(__file__)) - # if ("community" in selfPath): - # projPath = selfPath[:selfPath.find("communit")] - # else: - # projPath = selfPath[:selfPath.find("tests")] - - # for root, dirs, files in os.walk(projPath): - # if ("taosd" in files): - # rootRealPath = os.path.dirname(os.path.realpath(root)) - # if ("packaging" not in rootRealPath): - # buildPath = root[:len(root) - len("/build/bin")] - # break - # return buildPath - - def start(self): + def start(self, cmdLine): ON_POSIX = 'posix' in sys.builtin_module_names # Sanity check if self.subProcess: # already there raise RuntimeError("Corrupt process state") - - # global gContainer - # tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance - self._tInst.generateCfgFile() # service side generates config file, client does not - - self._tInst.rotateLogs() - - print("Starting TDengine instance: {}".format(self._tInst)) + self.subProcess = subprocess.Popen( - self._tInst.getServiceCommand(), + cmdLine, shell=False, # svcCmdSingle, shell=True, # capture core dump? stdout=subprocess.PIPE, @@ -218,31 +247,50 @@ class TdeSubProcess: ) # had text=True, which interferred with reading EOF def stop(self): + """ + Stop a sub process, and try to return a meaningful return code. + + Common POSIX signal values (from man -7 signal): + SIGHUP 1 + SIGINT 2 + SIGQUIT 3 + SIGILL 4 + SIGTRAP 5 + SIGABRT 6 + SIGIOT 6 + SIGBUS 7 + SIGEMT - + SIGFPE 8 + SIGKILL 9 + SIGUSR1 10 + SIGSEGV 11 + SIGUSR2 12 + """ if not self.subProcess: print("Sub process already stopped") - return -1 + return # -1 - retCode = self.subProcess.poll() # contains real sub process return code + retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N) if retCode: # valid return code, process ended + retCode = -retCode # only if valid + Logging.warning("TSP.stop(): process ended itself") self.subProcess = None - else: # process still alive, let's interrupt it - print( - "Sub process is running, sending SIG_INT and waiting for it to terminate...") - # sub process should end, then IPC queue should end, causing IO - # thread to end - self.subProcess.send_signal(signal.SIGINT) - try: - self.subProcess.wait(10) - retCode = self.subProcess.returncode - except subprocess.TimeoutExpired as err: - print("Time out waiting for TDengine service process to exit") - retCode = -3 - else: - print("TDengine service process terminated successfully from SIG_INT") - retCode = -4 - self.subProcess = None - return retCode - + return retCode + + # process still alive, let's interrupt it + print("Terminate running process, send SIG_INT and wait...") + # sub process should end, then IPC queue should end, causing IO thread to end + self.subProcess.send_signal(signal.SIGINT) + self.subProcess.wait(20) + retCode = self.subProcess.returncode # should always be there + # May throw subprocess.TimeoutExpired exception above, therefore + # The process is guranteed to have ended by now + self.subProcess = None + if retCode != 0: # != (- signal.SIGINT): + Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG_INT, retCode={}".format(retCode)) + else: + Logging.info("TSP.stop(): sub proc successfully terminated with SIG_INT") + return - retCode class ServiceManager: PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process @@ -259,19 +307,25 @@ class ServiceManager: # self._status = MainExec.STATUS_RUNNING # set inside # _startTaosService() self._runCluster = (numDnodes >= 1) - self.svcMgrThreads = [] # type: List[ServiceManagerThread] + self._tInsts : List[TdeInstance] = [] for i in range(0, numDnodes): - self.svcMgrThreads.append(ServiceManagerThread(i)) + ti = self._createTdeInstance(i) # construct tInst + self._tInsts.append(ti) + + # self.svcMgrThreads : List[ServiceManagerThread] = [] + # for i in range(0, numDnodes): + # thread = self._createThread(i) # construct tInst + # self.svcMgrThreads.append(thread) - def _createThread(self, dnIndex): - if not self._runCluster: # single instance - return ServiceManagerThread(0) + def _createTdeInstance(self, dnIndex): + # if not self._runCluster: # single instance + # return ServiceManagerThread(0) # Create all threads in a cluster subdir = 'cluster_dnode_{}'.format(dnIndex) fepPort= 6030 # firstEP Port port = fepPort + dnIndex * 100 - ti = TdeInstance(subdir, port, fepPort) - return ServiceManagerThread(dnIndex, ti) + return TdeInstance(subdir, dnIndex, port, fepPort) + # return ServiceManagerThread(dnIndex, ti) def _doMenu(self): choice = "" @@ -336,8 +390,8 @@ class ServiceManager: Determine if the service/cluster is active at all, i.e. at least one thread is not "stopped". """ - for thread in self.svcMgrThreads: - if not thread.isStopped(): + for ti in self._tInsts: + if not ti.getStatus().isStopped(): return True return False @@ -356,28 +410,31 @@ class ServiceManager: Determine if the service/cluster is "stable", i.e. all of the threads are in "stable" status. """ - for thread in self.svcMgrThreads: - if not thread.isStable(): + for ti in self._tInsts: + if not ti.isStable(): return False return True def _procIpcAll(self): while self.isActive(): - for thread in self.svcMgrThreads: # all thread objects should always be valid + Progress.emit(Progress.SERVICE_HEART_BEAT) + for ti in self._tInsts: # all thread objects should always be valid # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here - if thread.isRunning(): - thread.procIpcBatch() # regular processing, - if thread.isStopped(): - thread.procIpcBatch() # one last time? + status = ti.getStatus() + if status.isRunning(): + th = ti.getSmThread() + th.procIpcBatch() # regular processing, + if status.isStopped(): + th.procIpcBatch() # one last time? # self._updateThreadStatus() - elif thread.isRetarting(): - print("Service restarting...") - # else this thread is stopped time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round # raise CrashGenError("dummy") print("Service Manager Thread (with subprocess) ended, main thread exiting...") + def _getFirstInstance(self): + return self._tInsts[0] + def startTaosServices(self): with self._lock: if self.isActive(): @@ -386,15 +443,19 @@ class ServiceManager: # Find if there's already a taosd service, and then kill it for proc in psutil.process_iter(): if proc.name() == 'taosd': - print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe") + print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt") time.sleep(2.0) proc.kill() # print("Process: {}".format(proc.name())) # self.svcMgrThread = ServiceManagerThread() # create the object - for thread in self.svcMgrThreads: - thread.start() - thread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines + + for ti in self._tInsts: + ti.start() + if not ti.isFirst(): + tFirst = self._getFirstInstance() + tFirst.createDnode(ti.getDbTarget()) + ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines def stopTaosServices(self): with self._lock: @@ -402,8 +463,8 @@ class ServiceManager: Logging.warning("Cannot stop TAOS service(s), already not active") return - for thread in self.svcMgrThreads: - thread.stop() + for ti in self._tInsts: + ti.stop() def run(self): self.startTaosServices() @@ -412,7 +473,7 @@ class ServiceManager: self.stopTaosServices() # should have started already def restart(self): - if not self.isStable(): + if not self.getStatus().isStable(): Logging.warning("Cannot restart service/cluster, when not stable") return @@ -440,42 +501,27 @@ class ServiceManagerThread: """ MAX_QUEUE_SIZE = 10000 - def __init__(self, tInstNum = 0, tInst : TdeInstance = None): + def __init__(self): # Set the sub process self._tdeSubProcess = None # type: TdeSubProcess # Arrange the TDengine instance - self._tInstNum = tInstNum # instance serial number in cluster, ZERO based - self._tInst = tInst or TdeInstance() # Need an instance + # self._tInstNum = tInstNum # instance serial number in cluster, ZERO based + # self._tInst = tInst or TdeInstance() # Need an instance self._thread = None # The actual thread, # type: threading.Thread - self._status = Status.STATUS_STOPPED # The status of the underlying service, actually. + self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually. def __repr__(self): - return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum) + return "[SvcMgrThread: status={}, subProc={}]".format( + self.getStatus(), self._tdeSubProcess) def getStatus(self): return self._status - def isStarting(self): - return self._status == Status.STATUS_STARTING - - def isRunning(self): - # return self._thread and self._thread.is_alive() - return self._status == Status.STATUS_RUNNING - - def isStopping(self): - return self._status == Status.STATUS_STOPPING - - def isStopped(self): - return self._status == Status.STATUS_STOPPED - - def isStable(self): - return self.isRunning() or self.isStopped() - # Start the thread (with sub process), and wait for the sub service # to become fully operational - def start(self): + def start(self, cmdLine): if self._thread: raise RuntimeError("Unexpected _thread") if self._tdeSubProcess: @@ -483,9 +529,9 @@ class ServiceManagerThread: Logging.info("Attempting to start TAOS service: {}".format(self)) - self._status = Status.STATUS_STARTING - self._tdeSubProcess = TdeSubProcess(self._tInst) - self._tdeSubProcess.start() + self._status.set(Status.STATUS_STARTING) + self._tdeSubProcess = TdeSubProcess() + self._tdeSubProcess.start(cmdLine) self._ipcQueue = Queue() self._thread = threading.Thread( # First thread captures server OUTPUT @@ -505,10 +551,11 @@ class ServiceManagerThread: time.sleep(1.0) # self.procIpcBatch() # don't pump message during start up print("_zz_", end="", flush=True) - if self._status == Status.STATUS_RUNNING: + if self._status.isRunning(): Logging.info("[] TDengine service READY to process requests") Logging.info("[] TAOS service started: {}".format(self)) - self._verifyDnode(self._tInst) # query and ensure dnode is ready + # self._verifyDnode(self._tInst) # query and ensure dnode is ready + # Logging.debug("[] TAOS Dnode verified: {}".format(self)) return # now we've started # TODO: handle failure-to-start better? self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output @@ -523,25 +570,27 @@ class ServiceManagerThread: # ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type isValid = False for col in cols: - print("col = {}".format(col)) + # print("col = {}".format(col)) ep = col[1].split(':') # 10.1.30.2:6030 - print("ep={}".format(ep)) + print("Found ep={}".format(ep)) if tInst.getPort() == int(ep[1]): # That's us - print("Valid Dnode matched!") + # print("Valid Dnode matched!") isValid = True # now we are valid break if not isValid: - raise RuntimeError("Failed to start Dnode, port = {}, expected: {}". - format(ep[1], tInst.getPort())) + print("Failed to start dnode, sleep for a while") + time.sleep(600) + raise RuntimeError("Failed to start Dnode, expected port not found: {}". + format(tInst.getPort())) dbc.close() def stop(self): # can be called from both main thread or signal handler print("Terminating TDengine service running as the sub process...") - if self.isStopped(): + if self.getStatus().isStopped(): print("Service already stopped") return - if self.isStopping(): + if self.getStatus().isStopping(): print("Service is already being stopped") return # Linux will send Control-C generated SIGINT to the TDengine process @@ -550,39 +599,42 @@ class ServiceManagerThread: if not self._tdeSubProcess: raise RuntimeError("sub process object missing") - self._status = Status.STATUS_STOPPING - retCode = self._tdeSubProcess.stop() - print("Attempted to stop sub process, got return code: {}".format(retCode)) - if (retCode==-11): # SGV - Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") - - if self._tdeSubProcess.isRunning(): # still running - print("FAILED to stop sub process, it is still running... pid = {}".format( + self._status.set(Status.STATUS_STOPPING) + # retCode = self._tdeSubProcess.stop() + try: + retCode = self._tdeSubProcess.stop() + # print("Attempted to stop sub process, got return code: {}".format(retCode)) + if retCode == signal.SIGSEGV : # SGV + Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") + except subprocess.TimeoutExpired as err: + print("Time out waiting for TDengine service process to exit") + else: + if self._tdeSubProcess.isRunning(): # still running, should now never happen + print("FAILED to stop sub process, it is still running... pid = {}".format( self._tdeSubProcess.getPid())) - else: - self._tdeSubProcess = None # not running any more - self.join() # stop the thread, change the status, etc. + else: + self._tdeSubProcess = None # not running any more + self.join() # stop the thread, change the status, etc. # Check if it's really stopped - outputLines = 20 # for last output - if self.isStopped(): + outputLines = 10 # for last output + if self.getStatus().isStopped(): self.procIpcBatch(outputLines) # one last time - print("End of TDengine Service Output: {}".format(self)) - print("----- TDengine Service (managed by SMT) is now terminated -----\n") + Logging.debug("End of TDengine Service Output: {}".format(self)) + Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n") else: print("WARNING: SMT did not terminate as expected: {}".format(self)) def join(self): # TODO: sanity check - if not self.isStopping(): + if not self.getStatus().isStopping(): raise RuntimeError( - "Unexpected status when ending svc mgr thread: {}".format( - self._status)) + "SMT.Join(): Unexpected status: {}".format(self._status)) if self._thread: self._thread.join() self._thread = None - self._status = Status.STATUS_STOPPED + self._status.set(Status.STATUS_STOPPED) # STD ERR thread self._thread2.join() self._thread2 = None @@ -651,25 +703,27 @@ class ServiceManagerThread: queue.put(line) self._printProgress("_i") - if self._status == Status.STATUS_STARTING: # we are starting, let's see if we have started + if self._status.isStarting(): # we are starting, let's see if we have started if line.find(self.TD_READY_MSG) != -1: # found Logging.info("Waiting for the service to become FULLY READY") time.sleep(1.0) # wait for the server to truly start. TODO: remove this - Logging.info("Service instance #{} is now FULLY READY".format(self._tInstNum)) - self._status = Status.STATUS_RUNNING + Logging.info("Service is now FULLY READY") # TODO: more ID info here? + self._status.set(Status.STATUS_RUNNING) # Trim the queue if necessary: TODO: try this 1 out of 10 times self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size - if self.isStopping(): # TODO: use thread status instead + if self._status.isStopping(): # TODO: use thread status instead # WAITING for stopping sub process to finish its outptu print("_w", end="", flush=True) # queue.put(line) # meaning sub process must have died - print("\nNo more output from IO thread managing TDengine service") + Logging.info("\nEnd of stream detected for TDengine STDOUT: {}".format(self)) out.close() def svcErrorReader(self, err: IO, queue): for line in iter(err.readline, b''): print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line)) + Logging.info("\nEnd of stream detected for TDengine STDERR: {}".format(self)) + err.close() \ No newline at end of file