From 5d1d5cadc293b971404c6a230f2da6f16f1b9a0c Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 28 Apr 2021 08:36:56 +0000 Subject: [PATCH] Refactored crash_gen to have the TdeSubProcess own the SvcMgrThread object, also switched to Pylance --- .../python/linux/python3/taos/__init__.py | 1 + tests/pytest/crash_gen/__init__.py | 1 + tests/pytest/crash_gen/crash_gen_main.py | 167 ++++++------ tests/pytest/crash_gen/db.py | 39 +-- tests/pytest/crash_gen/misc.py | 17 +- tests/pytest/crash_gen/service_manager.py | 243 +++++++++++------- tests/pytest/crash_gen/settings.py | 26 +- tests/pytest/crash_gen/types.py | 5 + 8 files changed, 287 insertions(+), 212 deletions(-) create mode 100644 tests/pytest/crash_gen/types.py diff --git a/src/connector/python/linux/python3/taos/__init__.py b/src/connector/python/linux/python3/taos/__init__.py index 9732635738..1b086f36ec 100644 --- a/src/connector/python/linux/python3/taos/__init__.py +++ b/src/connector/python/linux/python3/taos/__init__.py @@ -1,6 +1,7 @@ from .connection import TDengineConnection from .cursor import TDengineCursor +from .error import Error # Globals threadsafety = 0 diff --git a/tests/pytest/crash_gen/__init__.py b/tests/pytest/crash_gen/__init__.py index 71855bc5a9..2509d7d76d 100644 --- a/tests/pytest/crash_gen/__init__.py +++ b/tests/pytest/crash_gen/__init__.py @@ -3,3 +3,4 @@ from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager from crash_gen.settings import Settings +from crash_gen.types import DirPath \ No newline at end of file diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 39475a32d0..8d34a7555b 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -15,7 +15,7 @@ # https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel from __future__ import annotations -from typing import Set +from typing import Any, Set, Tuple from typing import Dict from typing import List from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none @@ -57,8 +57,8 @@ if sys.version_info[0] < 3: # Command-line/Environment Configurations, will set a bit later # ConfigNameSpace = argparse.Namespace -gConfig: argparse.Namespace -gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection +# gConfig: argparse.Namespace +gSvcMgr: Optional[ServiceManager] # TODO: refactor this hack, use dep injection # logger: logging.Logger gContainer: Container @@ -81,20 +81,20 @@ class WorkerThread: self._stepGate = threading.Event() # Let us have a DB connection of our own - if (gConfig.per_thread_db_connection): # type: ignore + if (Settings.getConfig().per_thread_db_connection): # type: ignore # print("connector_type = {}".format(gConfig.connector_type)) tInst = gContainer.defTdeInstance - if gConfig.connector_type == 'native': + if Settings.getConfig().connector_type == 'native': self._dbConn = DbConn.createNative(tInst.getDbTarget()) - elif gConfig.connector_type == 'rest': + elif Settings.getConfig().connector_type == 'rest': self._dbConn = DbConn.createRest(tInst.getDbTarget()) - elif gConfig.connector_type == 'mixed': + elif Settings.getConfig().connector_type == 'mixed': if Dice.throw(2) == 0: # 1/2 chance self._dbConn = DbConn.createNative(tInst.getDbTarget()) else: self._dbConn = DbConn.createRest(tInst.getDbTarget()) else: - raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) + raise RuntimeError("Unexpected connector type: {}".format(Settings.getConfig().connector_type)) # self._dbInUse = False # if "use db" was executed already @@ -123,14 +123,14 @@ class WorkerThread: # self.isSleeping = False Logging.info("Starting to run thread: {}".format(self._tid)) - if (gConfig.per_thread_db_connection): # type: ignore + if (Settings.getConfig().per_thread_db_connection): # type: ignore Logging.debug("Worker thread openning database connection") self._dbConn.open() self._doTaskLoop() # clean up - if (gConfig.per_thread_db_connection): # type: ignore + if (Settings.getConfig().per_thread_db_connection): # type: ignore if self._dbConn.isOpen: #sometimes it is not open self._dbConn.close() else: @@ -158,7 +158,7 @@ class WorkerThread: # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) try: - if (gConfig.per_thread_db_connection): # most likely TRUE + if (Settings.getConfig().per_thread_db_connection): # most likely TRUE if not self._dbConn.isOpen: # might have been closed during server auto-restart self._dbConn.open() # self.useDb() # might encounter exceptions. TODO: catch @@ -232,7 +232,7 @@ class WorkerThread: return self.getDbConn().getQueryResult() def getDbConn(self) -> DbConn : - if (gConfig.per_thread_db_connection): + if (Settings.getConfig().per_thread_db_connection): return self._dbConn else: return self._tc.getDbManager().getDbConn() @@ -254,7 +254,7 @@ class ThreadCoordinator: self._pool = pool # self._wd = wd self._te = None # prepare for every new step - self._dbManager = dbManager + self._dbManager = dbManager # type: Optional[DbManager] # may be freed self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things @@ -266,9 +266,13 @@ class ThreadCoordinator: self._stepStartTime = None # Track how long it takes to execute each step def getTaskExecutor(self): + if self._te is None: + raise CrashGenError("Unexpected empty TE") return self._te def getDbManager(self) -> DbManager: + if self._dbManager is None: + raise ChildProcessError("Unexpected empty _dbManager") return self._dbManager def crossStepBarrier(self, timeout=None): @@ -279,7 +283,7 @@ class ThreadCoordinator: self._execStats.registerFailure("User Interruption") def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): - maxSteps = gConfig.max_steps # type: ignore + maxSteps = Settings.getConfig().max_steps # type: ignore if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 return True if self._runStatus != Status.STATUS_RUNNING: @@ -384,7 +388,7 @@ class ThreadCoordinator: hasAbortedTask = False workerTimeout = False while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): - if not gConfig.debug: # print this only if we are not in debug mode + if not Settings.getConfig().debug: # print this only if we are not in debug mode Progress.emit(Progress.STEP_BOUNDARY) # print(".", end="", flush=True) # if (self._curStep % 2) == 0: # print memory usage once every 10 steps @@ -469,7 +473,7 @@ class ThreadCoordinator: self._pool = None self._te = None self._dbManager = None - self._executedTasks = None + self._executedTasks = [] self._lock = None self._stepBarrier = None self._execStats = None @@ -508,18 +512,18 @@ class ThreadCoordinator: ''' Initialize multiple databases, invoked at __ini__() time ''' self._dbs = [] # type: List[Database] dbc = self.getDbManager().getDbConn() - if gConfig.max_dbs == 0: + if Settings.getConfig().max_dbs == 0: self._dbs.append(Database(0, dbc)) else: baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic - )*333) % 888 if gConfig.dynamic_db_table_names else 0 - for i in range(gConfig.max_dbs): + )*333) % 888 if Settings.getConfig().dynamic_db_table_names else 0 + for i in range(Settings.getConfig().max_dbs): self._dbs.append(Database(baseDbNumber + i, dbc)) def pickDatabase(self): idxDb = 0 - if gConfig.max_dbs != 0 : - idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1 + if Settings.getConfig().max_dbs != 0 : + idxDb = Dice.throw(Settings.getConfig().max_dbs) # 0 to N-1 db = self._dbs[idxDb] # type: Database return db @@ -563,7 +567,7 @@ class ThreadPool: workerThread._thread.join() def cleanup(self): - self.threadList = None # maybe clean up each? + self.threadList = [] # maybe clean up each? # A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers # for new table names @@ -673,7 +677,7 @@ class AnyState: # Each sub state tells us the "info", about itself, so we can determine # on things like canDropDB() - def getInfo(self): + def getInfo(self) -> List[Any]: raise RuntimeError("Must be overriden by child classes") def equals(self, other): @@ -701,7 +705,7 @@ class AnyState: def canDropDb(self): # If user requests to run up to a number of DBs, # we'd then not do drop_db operations any more - if gConfig.max_dbs > 0 or gConfig.use_shadow_db : + if Settings.getConfig().max_dbs > 0 or Settings.getConfig().use_shadow_db : return False return self._info[self.CAN_DROP_DB] @@ -709,7 +713,7 @@ class AnyState: return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] def canDropFixedSuperTable(self): - if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table + if Settings.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table return False return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] @@ -911,7 +915,7 @@ class StateMechine: # May be slow, use cautionsly... def getTaskTypes(self): # those that can run (directly/indirectly) from the current state - def typesToStrings(types): + def typesToStrings(types) -> List: ss = [] for t in types: ss.append(t.__name__) @@ -1030,13 +1034,14 @@ class StateMechine: # ref: # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ - def _weighted_choice_sub(self, weights): + def _weighted_choice_sub(self, weights) -> int: # TODO: use our dice to ensure it being determinstic? rnd = random.random() * sum(weights) for i, w in enumerate(weights): rnd -= w if rnd < 0: return i + raise CrashGenError("Unexpected no choice") class Database: ''' We use this to represent an actual TDengine database inside a service instance, @@ -1048,8 +1053,8 @@ class Database: ''' _clsLock = threading.Lock() # class wide lock _lastInt = 101 # next one is initial integer - _lastTick = 0 - _lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions + _lastTick = None # Optional[datetime] + _lastLaggingTick = None # Optional[datetime] # lagging tick, for out-of-sequence (oos) data insertions def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc self._dbNum = dbNum # we assign a number to databases, for our testing purpose @@ -1114,14 +1119,14 @@ class Database: Fetch a timestamp tick, with some random factor, may not be unique. ''' with cls._clsLock: # prevent duplicate tick - if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized + if cls._lastLaggingTick is None or cls._lastTick is None : # not initialized # 10k at 1/20 chance, should be enough to avoid overlaps tick = cls.setupLastTick() cls._lastTick = tick cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast # if : # should be quite a bit into the future - if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick + if Settings.getConfig().mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence return cls._lastLaggingTick else: # regular @@ -1303,10 +1308,10 @@ class Task(): ]: return True # These are the ALWAYS-ACCEPTABLE ones # This case handled below already. - # elif (errno in [ 0x0B ]) and gConfig.auto_start_service: + # elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service: # return True # We may get "network unavilable" when restarting service - elif gConfig.ignore_errors: # something is specified on command line - moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')] + elif Settings.getConfig().ignore_errors: # something is specified on command line + moreErrnos = [int(v, 0) for v in Settings.getConfig().ignore_errors.split(',')] if errno in moreErrnos: return True elif errno == 0x200 : # invalid SQL, we need to div in a bit more @@ -1342,7 +1347,7 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: errno2 = Helper.convertErrno(err.errno) - if (gConfig.continue_on_exception): # user choose to continue + if (Settings.getConfig().continue_on_exception): # user choose to continue self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, wt.getDbConn().getLastSql())) self._err = err @@ -1357,7 +1362,7 @@ class Task(): self.__class__.__name__, errno2, err, wt.getDbConn().getLastSql()) self.logDebug(errMsg) - if gConfig.debug: + if Settings.getConfig().debug: # raise # so that we see full stack traceback.print_exc() print( @@ -1422,11 +1427,11 @@ class Task(): class ExecutionStats: def __init__(self): # total/success times for a task - self._execTimes: Dict[str, [int, int]] = {} + self._execTimes: Dict[str, List[int]] = {} self._tasksInProgress = 0 self._lock = threading.Lock() - self._firstTaskStartTime = None - self._execStartTime = None + self._firstTaskStartTime = 0.0 + self._execStartTime = 0.0 self._errors = {} self._elapsedTime = 0.0 # total elapsed time self._accRunTime = 0.0 # accumulated run time @@ -1471,7 +1476,7 @@ class ExecutionStats: self._tasksInProgress -= 1 if self._tasksInProgress == 0: # all tasks have stopped self._accRunTime += (time.time() - self._firstTaskStartTime) - self._firstTaskStartTime = None + self._firstTaskStartTime = 0.0 def registerFailure(self, reason): self._failed = True @@ -1555,7 +1560,7 @@ class StateTransitionTask(Task): def getRegTableName(cls, i): if ( StateTransitionTask._baseTableNumber is None): # Set it one time StateTransitionTask._baseTableNumber = Dice.throw( - 999) if gConfig.dynamic_db_table_names else 0 + 999) if Settings.getConfig().dynamic_db_table_names else 0 return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i) def execute(self, wt: WorkerThread): @@ -1575,14 +1580,14 @@ class TaskCreateDb(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): # was: self.execWtSql(wt, "create database db") repStr = "" - if gConfig.num_replicas != 1: - # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N - numReplica = gConfig.num_replicas # fixed, always + if Settings.getConfig().num_replicas != 1: + # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N + numReplica = Settings.getConfig().num_replicas # fixed, always repStr = "replica {}".format(numReplica) - updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active + updatePostfix = "update 1" if Settings.getConfig().verify_data else "" # allow update only when "verify data" is active dbName = self._db.getName() self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) ) - if dbName == "db_0" and gConfig.use_shadow_db: + if dbName == "db_0" and Settings.getConfig().use_shadow_db: self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) ) class TaskDropDb(StateTransitionTask): @@ -1887,7 +1892,7 @@ class TaskDropSuperTable(StateTransitionTask): if Dice.throw(2) == 0: # print("_7_", end="", flush=True) tblSeq = list(range( - 2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) + 2 + (self.LARGE_NUMBER_OF_TABLES if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES))) random.shuffle(tblSeq) tickOutput = False # if we have spitted out a "d" character for "drop regular table" isSuccess = True @@ -1953,13 +1958,13 @@ class TaskRestartService(StateTransitionTask): @classmethod def canBeginFrom(cls, state: AnyState): - if gConfig.auto_start_service: + if Settings.getConfig().auto_start_service: return state.canDropFixedSuperTable() # Basicallly when we have the super table return False # don't run this otherwise CHANCE_TO_RESTART_SERVICE = 200 def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - if not gConfig.auto_start_service: # only execute when we are in -a mode + if not Settings.getConfig().auto_start_service: # only execute when we are in -a mode print("_a", end="", flush=True) return @@ -1981,12 +1986,12 @@ class TaskAddData(StateTransitionTask): activeTable: Set[int] = set() # We use these two files to record operations to DB, useful for power-off tests - fAddLogReady = None # type: io.TextIOWrapper - fAddLogDone = None # type: io.TextIOWrapper + fAddLogReady = None # type: Optional[io.TextIOWrapper] + fAddLogDone = None # type: Optional[io.TextIOWrapper] @classmethod def prepToRecordOps(cls): - if gConfig.record_ops: + if Settings.getConfig().record_ops: if (cls.fAddLogReady is None): Logging.info( "Recording in a file operations to be performed...") @@ -2004,7 +2009,7 @@ class TaskAddData(StateTransitionTask): return state.canAddData() def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): - numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS + numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS fullTableName = db.getName() + '.' + regTableName sql = "INSERT INTO {} VALUES ".format(fullTableName) @@ -2016,21 +2021,23 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches - numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS + numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS for j in range(numRecords): # number of records per table nextInt = db.getNextInt() nextTick = db.getNextTick() nextColor = db.getNextColor() - if gConfig.record_ops: + if Settings.getConfig().record_ops: self.prepToRecordOps() + if self.fAddLogReady is None: + raise CrashGenError("Unexpected empty fAddLogReady") self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.flush() os.fsync(self.fAddLogReady.fileno()) # TODO: too ugly trying to lock the table reliably, refactor... fullTableName = db.getName() + '.' + regTableName - if gConfig.verify_data: + if Settings.getConfig().verify_data: self.lockTable(fullTableName) # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written @@ -2043,7 +2050,7 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) # Quick hack, attach an update statement here. TODO: create an "update" task - if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB + if (not Settings.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB nextInt = db.getNextInt() nextColor = db.getNextColor() sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here @@ -2054,12 +2061,12 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) except: # Any exception at all - if gConfig.verify_data: + if Settings.getConfig().verify_data: self.unlockTable(fullTableName) raise # Now read it back and verify, we might encounter an error if table is dropped - if gConfig.verify_data: # only if command line asks for it + if Settings.getConfig().verify_data: # only if command line asks for it try: readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". format(db.getName(), regTableName, nextTick)) @@ -2086,7 +2093,9 @@ class TaskAddData(StateTransitionTask): # Successfully wrote the data into the DB, let's record it somehow te.recordDataMark(nextInt) - if gConfig.record_ops: + if Settings.getConfig().record_ops: + if self.fAddLogDone is None: + raise CrashGenError("Unexpected empty fAddLogDone") self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.flush() os.fsync(self.fAddLogDone.fileno()) @@ -2095,8 +2104,8 @@ class TaskAddData(StateTransitionTask): # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access db = self._db dbc = wt.getDbConn() - numTables = self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES - numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS + numTables = self.LARGE_NUMBER_OF_TABLES if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES + numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS tblSeq = list(range(numTables )) random.shuffle(tblSeq) # now we have random sequence for i in tblSeq: @@ -2126,7 +2135,9 @@ class ThreadStacks: # stack info for all threads def __init__(self): self._allStacks = {} allFrames = sys._current_frames() - for th in threading.enumerate(): + for th in threading.enumerate(): + if th.ident is None: + continue stack = traceback.extract_stack(allFrames[th.ident]) self._allStacks[th.native_id] = stack @@ -2247,14 +2258,15 @@ class ClientManager: def run(self, svcMgr): # self._printLastNumbers() - global gConfig + # global gConfig # Prepare Tde Instance global gContainer tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance" - dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget()) # Regular function - thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) + cfg = Settings.getConfig() + dbManager = DbManager(cfg.connector_type, tInst.getDbTarget()) # Regular function + thPool = ThreadPool(cfg.num_threads, cfg.max_steps) self.tc = ThreadCoordinator(thPool, dbManager) Logging.info("Starting client instance: {}".format(tInst)) @@ -2267,7 +2279,8 @@ class ClientManager: # Release global variables - gConfig = None + # gConfig = None + Settings.clearConfig() gSvcMgr = None logger = None @@ -2298,7 +2311,7 @@ class ClientManager: class MainExec: def __init__(self): self._clientMgr = None - self._svcMgr = None # type: ServiceManager + self._svcMgr = None # type: Optional[ServiceManager] signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler) @@ -2318,7 +2331,7 @@ class MainExec: def runClient(self): global gSvcMgr - if gConfig.auto_start_service: + if Settings.getConfig().auto_start_service: gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert gSvcMgr.startTaosServices() # we start, don't run @@ -2327,13 +2340,13 @@ class MainExec: try: ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside except requests.exceptions.ConnectionError as err: - Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage())) + Logging.warning("Failed to open REST connection to DB: {}".format(err)) # don't raise return ret def runService(self): global gSvcMgr - gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert + gSvcMgr = self._svcMgr = ServiceManager(Settings.getConfig().num_dnodes) # save it in a global variable TODO: hack alert gSvcMgr.run() # run to some end state gSvcMgr = self._svcMgr = None @@ -2467,20 +2480,20 @@ class MainExec: action='store_true', help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)') - global gConfig - gConfig = parser.parse_args() - Settings.setConfig(gConfig) # TODO: fix this hack, consolidate this global var + # global gConfig + config = parser.parse_args() + Settings.setConfig(config) # TODO: fix this hack, consolidate this global var # Sanity check for arguments - if gConfig.use_shadow_db and gConfig.max_dbs>1 : + if Settings.getConfig().use_shadow_db and Settings.getConfig().max_dbs>1 : raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1") - Logging.clsInit(gConfig) + Logging.clsInit(Settings.getConfig()) Dice.seed(0) # initial seeding of dice def run(self): - if gConfig.run_tdengine: # run server + if Settings.getConfig().run_tdengine: # run server try: self.runService() return 0 # success diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/db.py index 62a369c41a..6c85da0051 100644 --- a/tests/pytest/crash_gen/db.py +++ b/tests/pytest/crash_gen/db.py @@ -5,6 +5,7 @@ import time import threading import requests from requests.auth import HTTPBasicAuth +from crash_gen.types import QueryResult import taos from util.sql import * @@ -18,7 +19,7 @@ import datetime import traceback # from .service_manager import TdeInstance -import crash_gen.settings +from crash_gen.settings import Settings class DbConn: TYPE_NATIVE = "native-c" @@ -79,7 +80,7 @@ class DbConn: raise RuntimeError("Cannot query database until connection is open") nRows = self.query(sql) if nRows != 1: - raise taos.error.ProgrammingError( + raise CrashGenError( "Unexpected result for query: {}, rows = {}".format(sql, nRows), (CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT) ) @@ -115,7 +116,7 @@ class DbConn: try: self.execute(sql) return True # ignore num of results, return success - except taos.error.ProgrammingError as err: + except taos.error.Error as err: return False # failed, for whatever TAOS reason # Not possile to reach here, non-TAOS exception would have been thrown @@ -126,7 +127,7 @@ class DbConn: def openByType(self): raise RuntimeError("Unexpected execution, should be overriden") - def getQueryResult(self): + def getQueryResult(self) -> QueryResult : raise RuntimeError("Unexpected execution, should be overriden") def getResultRows(self): @@ -221,7 +222,7 @@ class DbConnRest(DbConn): class MyTDSql: # Class variables _clsLock = threading.Lock() # class wide locking - longestQuery = None # type: str + longestQuery = '' # type: str longestQueryTime = 0.0 # seconds lqStartTime = 0.0 # lqEndTime = 0.0 # Not needed, as we have the two above already @@ -261,7 +262,7 @@ class MyTDSql: cls.lqStartTime = startTime # Now write to the shadow database - if crash_gen.settings.gConfig.use_shadow_db: + if Settings.getConfig().use_shadow_db: if sql[:11] == "INSERT INTO": if sql[:16] == "INSERT INTO db_0": sql2 = "INSERT INTO db_s" + sql[16:] @@ -453,31 +454,11 @@ class DbManager(): ''' Release the underlying DB connection upon deletion of DbManager ''' self.cleanUp() - def getDbConn(self): + def getDbConn(self) -> DbConn : + if self._dbConn is None: + raise CrashGenError("Unexpected empty DbConn") return self._dbConn - # TODO: not used any more, to delete - def pickAndAllocateTable(self): # pick any table, and "use" it - return self.tableNumQueue.pickAndAllocate() - - # TODO: Not used any more, to delete - def addTable(self): - with self._lock: - tIndex = self.tableNumQueue.push() - return tIndex - - # Not used any more, to delete - def releaseTable(self, i): # return the table back, so others can use it - self.tableNumQueue.release(i) - - # TODO: not used any more, delete - def getTableNameToDelete(self): - tblNum = self.tableNumQueue.pop() # TODO: race condition! - if (not tblNum): # maybe false - return False - - return "table_{}".format(tblNum) - def cleanUp(self): if self._dbConn: self._dbConn.close() diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/misc.py index 9774ec5455..cb98bc7c5b 100644 --- a/tests/pytest/crash_gen/misc.py +++ b/tests/pytest/crash_gen/misc.py @@ -3,6 +3,7 @@ import random import logging import os import sys +from typing import Optional import taos @@ -39,11 +40,11 @@ class MyLoggingAdapter(logging.LoggerAdapter): class Logging: - logger = None + logger = None # type: Optional[MyLoggingAdapter] @classmethod def getLogger(cls): - return logger + return cls.logger @classmethod def clsInit(cls, gConfig): # TODO: refactor away gConfig @@ -60,7 +61,7 @@ class Logging: # Logging adapter, to be used as a logger # print("setting logger variable") # global logger - cls.logger = MyLoggingAdapter(_logger, []) + cls.logger = MyLoggingAdapter(_logger, {}) if (gConfig.debug): cls.logger.setLevel(logging.DEBUG) # default seems to be INFO @@ -84,6 +85,7 @@ class Logging: cls.logger.error(msg) class Status: + STATUS_EMPTY = 99 STATUS_STARTING = 1 STATUS_RUNNING = 2 STATUS_STOPPING = 3 @@ -95,12 +97,16 @@ class Status: def __repr__(self): return "[Status: v={}]".format(self._status) - def set(self, status): + def set(self, status: int): self._status = status def get(self): return self._status + def isEmpty(self): + ''' Empty/Undefined ''' + return self._status == Status.STATUS_EMPTY + def isStarting(self): return self._status == Status.STATUS_STARTING @@ -117,6 +123,9 @@ class Status: def isStable(self): return self.isRunning() or self.isStopped() + def isActive(self): + return self.isStarting() or self.isRunning() or self.isStopping() + # Deterministic random number generator class Dice(): seeded = False # static, uninitialized diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 146215f2bc..4afd7e4c78 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -3,12 +3,12 @@ from __future__ import annotations import os import io import sys +from enum import Enum import threading import signal import logging import time from subprocess import PIPE, Popen, TimeoutExpired - from typing import IO, List, NewType, Optional try: @@ -16,12 +16,12 @@ try: except: print("Psutil module needed, please install: sudo pip3 install psutil") sys.exit(-1) - from queue import Queue, Empty -from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress -from .db import DbConn, DbTarget -import crash_gen.settings +from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status +from crash_gen.db import DbConn, DbTarget +from crash_gen.settings import Settings +from crash_gen.types import DirPath class TdeInstance(): """ @@ -70,7 +70,10 @@ class TdeInstance(): self._fepPort = fepPort self._tInstNum = tInstNum - self._smThread = ServiceManagerThread() + + # An "Tde Instance" will *contain* a "sub process" object, with will/may use a thread internally + # self._smThread = ServiceManagerThread() + self._subProcess = None # type: Optional[TdeSubProcess] def getDbTarget(self): return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port) @@ -155,21 +158,21 @@ quorum 2 def getExecFile(self): # .../taosd return self._buildDir + "/build/bin/taosd" - def getRunDir(self): # TODO: rename to "root dir" ?! - return self._buildDir + self._subdir + def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?! + return DirPath(self._buildDir + self._subdir) - def getCfgDir(self): # path, not file - return self.getRunDir() + "/cfg" + def getCfgDir(self) -> DirPath : # path, not file + return DirPath(self.getRunDir() + "/cfg") - def getLogDir(self): - return self.getRunDir() + "/log" + def getLogDir(self) -> DirPath : + return DirPath(self.getRunDir() + "/log") def getHostAddr(self): return "127.0.0.1" def getServiceCmdLine(self): # to start the instance cmdLine = [] - if crash_gen.settings.gConfig.track_memory_leaks: + if Settings.getConfig().track_memory_leaks: Logging.info("Invoking VALGRIND on service...") cmdLine = ['valgrind', '--leak-check=yes'] # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control @@ -199,27 +202,46 @@ quorum 2 dbc.close() def getStatus(self): - return self._smThread.getStatus() + # return self._smThread.getStatus() + if self._subProcess is None: + return Status(Status.STATUS_EMPTY) + return self._subProcess.getStatus() - def getSmThread(self): - return self._smThread + # def getSmThread(self): + # return self._smThread def start(self): - if not self.getStatus().isStopped(): + if self.getStatus().isActive(): raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus())) Logging.info("Starting TDengine instance: {}".format(self)) self.generateCfgFile() # service side generates config file, client does not self.rotateLogs() - self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions + # self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions + self._subProcess = TdeSubProcess(self.getServiceCmdLine(), self.getLogDir()) def stop(self): - self._smThread.stop() + self._subProcess.stop() + self._subProcess = None def isFirst(self): return self._tInstNum == 0 + def printFirst10Lines(self): + if self._subProcess is None: + Logging.warning("Incorrect TI status for procIpcBatch-10 operation") + return + self._subProcess.procIpcBatch(trimToTarget=10, forceOutput=True) + + def procIpcBatch(self): + if self._subProcess is None: + Logging.warning("Incorrect TI status for procIpcBatch operation") + return + self._subProcess.procIpcBatch() # may enounter EOF and change status to STOPPED + if self._subProcess.getStatus().isStopped(): + self._subProcess.stop() + self._subProcess = None class TdeSubProcess: """ @@ -237,16 +259,21 @@ class TdeSubProcess: # RET_TIME_OUT = -3 # RET_SUCCESS = -4 - def __init__(self, po: Popen): - self._popen = po # type: Popen - # if tInst is None: - # raise CrashGenError("Empty instance not allowed in TdeSubProcess") - # self._tInst = tInst # Default create at ServiceManagerThread + def __init__(self, cmdLine: List[str], logDir: DirPath): + # Create the process + managing thread immediately + + Logging.info("Attempting to start TAOS sub process...") + self._popen = self._start(cmdLine) # the actual sub process + self._smThread = ServiceManagerThread(self, logDir) # A thread to manage the sub process, mostly to process the IO + Logging.info("Successfully started TAOS process: {}".format(self)) + + def __repr__(self): # if self.subProcess is None: # return '[TdeSubProc: Empty]' - return '[TdeSubProc: pid = {}]'.format(self.getPid()) + return '[TdeSubProc: pid = {}, status = {}]'.format( + self.getPid(), self.getStatus() ) def getStdOut(self): return self._popen.stdout @@ -261,14 +288,14 @@ class TdeSubProcess: def getPid(self): return self._popen.pid - @classmethod - def start(cls, cmdLine): + def _start(self, cmdLine) -> Popen : ON_POSIX = 'posix' in sys.builtin_module_names # Sanity check # if self.subProcess: # already there # raise RuntimeError("Corrupt process state") + # Prepare environment variables for coverage information # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment myEnv = os.environ.copy() @@ -279,7 +306,7 @@ class TdeSubProcess: # print("Starting TDengine via Shell: {}".format(cmdLineStr)) # useShell = True # Needed to pass environments into it - popen = Popen( + return Popen( ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, shell=True, # Always use shell, since we need to pass ENV vars stdout=PIPE, @@ -287,15 +314,15 @@ class TdeSubProcess: close_fds=ON_POSIX, env=myEnv ) # had text=True, which interferred with reading EOF - return cls(popen) STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process? SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm - @classmethod - def stop(cls, tsp: TdeSubProcess): + def stop(self): """ - Stop a sub process, DO NOT return anything, process all conditions INSIDE + Stop a sub process, DO NOT return anything, process all conditions INSIDE. + + Calling function should immediately delete/unreference the object Common POSIX signal values (from man -7 signal): SIGHUP 1 @@ -315,11 +342,17 @@ class TdeSubProcess: """ # self._popen should always be valid. - # if not self.subProcess: - # Logging.error("Sub process already stopped") - # return + Logging.info("Terminating TDengine service running as the sub process...") + if self.getStatus().isStopped(): + Logging.info("Service already stopped") + return + if self.getStatus().isStopping(): + Logging.info("Service is already being stopped, pid: {}".format(self.getPid())) + return + + self.setStatus(Status.STATUS_STOPPING) - retCode = tsp._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N) + retCode = self._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N) if retCode: # valid return code, process ended # retCode = -retCode # only if valid Logging.warning("TSP.stop(): process ended itself") @@ -327,9 +360,12 @@ class TdeSubProcess: return # process still alive, let's interrupt it - cls._stopForSure(tsp._popen, cls.STOP_SIGNAL) # success if no exception + self._stopForSure(self._popen, self.STOP_SIGNAL) # success if no exception - # sub process should end, then IPC queue should end, causing IO thread to end + # sub process should end, then IPC queue should end, causing IO thread to end + self._smThread.stop() # stop for sure too + + self.setStatus(Status.STATUS_STOPPED) @classmethod def _stopForSure(cls, proc: Popen, sig: int): @@ -357,13 +393,13 @@ class TdeSubProcess: Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig)) child.send_signal(sig) try: - retCode = child.wait(20) - if (- retCode) == signal.SIGSEGV: # Crashed + retCode = child.wait(20) # type: ignore + if (- retCode) == signal.SIGSEGV: # type: ignore # Crashed Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid)) - elif (- retCode) == sig : + elif (- retCode) == sig : # type: ignore Logging.info("Sub-sub process terminated with expected return code {}".format(sig)) else: - Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) + Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) # type: ignore return True # terminated successfully except psutil.TimeoutExpired as err: Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig)) @@ -408,6 +444,15 @@ class TdeSubProcess: return raise CrashGenError("Failed to stop process, pid={}".format(pid)) + def getStatus(self): + return self._smThread.getStatus() + + def setStatus(self, status): + self._smThread.setStatus(status) + + def procIpcBatch(self, trimToTarget=0, forceOutput=False): + self._smThread.procIpcBatch(trimToTarget, forceOutput) + class ServiceManager: PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process @@ -504,10 +549,10 @@ class ServiceManager: def isActive(self): """ Determine if the service/cluster is active at all, i.e. at least - one thread is not "stopped". + one instance is active """ for ti in self._tInsts: - if not ti.getStatus().isStopped(): + if ti.getStatus().isActive(): return True return False @@ -545,10 +590,10 @@ class ServiceManager: # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here status = ti.getStatus() if status.isRunning(): - th = ti.getSmThread() - th.procIpcBatch() # regular processing, + # th = ti.getSmThread() + ti.procIpcBatch() # regular processing, if status.isStopped(): - th.procIpcBatch() # one last time? + ti.procIpcBatch() # one last time? # self._updateThreadStatus() time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round @@ -578,7 +623,8 @@ class ServiceManager: if not ti.isFirst(): tFirst = self._getFirstInstance() tFirst.createDnode(ti.getDbTarget()) - ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines + ti.printFirst10Lines() + # ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines def stopTaosServices(self): with self._lock: @@ -624,21 +670,24 @@ class ServiceManagerThread: """ MAX_QUEUE_SIZE = 10000 - def __init__(self): + def __init__(self, subProc: TdeSubProcess, logDir: str): # Set the sub process - self._tdeSubProcess = None # type: TdeSubProcess + # self._tdeSubProcess = None # type: TdeSubProcess # Arrange the TDengine instance # self._tInstNum = tInstNum # instance serial number in cluster, ZERO based # self._tInst = tInst or TdeInstance() # Need an instance - self._thread = None # The actual thread, # type: threading.Thread - self._thread2 = None # watching stderr + # self._thread = None # type: Optional[threading.Thread] # The actual thread, # type: threading.Thread + # self._thread2 = None # type: Optional[threading.Thread] Thread # watching stderr self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually. + self._start(subProc, logDir) + def __repr__(self): - return "[SvcMgrThread: status={}, subProc={}]".format( - self.getStatus(), self._tdeSubProcess) + raise CrashGenError("SMT status moved to TdeSubProcess") + # return "[SvcMgrThread: status={}, subProc={}]".format( + # self.getStatus(), self._tdeSubProcess) def getStatus(self): ''' @@ -646,29 +695,33 @@ class ServiceManagerThread: ''' return self._status + def setStatus(self, statusVal: int): + self._status.set(statusVal) + # Start the thread (with sub process), and wait for the sub service # to become fully operational - def start(self, cmdLine : str, logDir: str): + def _start(self, subProc :TdeSubProcess, logDir: str): ''' Request the manager thread to start a new sub process, and manage it. :param cmdLine: the command line to invoke :param logDir: the logging directory, to hold stdout/stderr files ''' - if self._thread: - raise RuntimeError("Unexpected _thread") - if self._tdeSubProcess: - raise RuntimeError("TDengine sub process already created/running") + # if self._thread: + # raise RuntimeError("Unexpected _thread") + # if self._tdeSubProcess: + # raise RuntimeError("TDengine sub process already created/running") - Logging.info("Attempting to start TAOS service: {}".format(self)) + # Moved to TdeSubProcess + # Logging.info("Attempting to start TAOS service: {}".format(self)) self._status.set(Status.STATUS_STARTING) - self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running + # self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running self._ipcQueue = Queue() # type: Queue self._thread = threading.Thread( # First thread captures server OUTPUT target=self.svcOutputReader, - args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir)) + args=(subProc.getStdOut(), self._ipcQueue, logDir)) self._thread.daemon = True # thread dies with the program self._thread.start() time.sleep(0.01) @@ -680,7 +733,7 @@ class ServiceManagerThread: self._thread2 = threading.Thread( # 2nd thread captures server ERRORs target=self.svcErrorReader, - args=(self._tdeSubProcess.getStdErr(), self._ipcQueue, logDir)) + args=(subProc.getStdErr(), self._ipcQueue, logDir)) self._thread2.daemon = True # thread dies with the program self._thread2.start() time.sleep(0.01) @@ -695,14 +748,14 @@ class ServiceManagerThread: Progress.emit(Progress.SERVICE_START_NAP) # print("_zz_", end="", flush=True) if self._status.isRunning(): - Logging.info("[] TDengine service READY to process requests") - Logging.info("[] TAOS service started: {}".format(self)) + Logging.info("[] TDengine service READY to process requests: pid={}".format(subProc.getPid())) + # Logging.info("[] TAOS service started: {}".format(self)) # self._verifyDnode(self._tInst) # query and ensure dnode is ready # Logging.debug("[] TAOS Dnode verified: {}".format(self)) return # now we've started # TODO: handle failure-to-start better? self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output - raise RuntimeError("TDengine service did not start successfully: {}".format(self)) + raise RuntimeError("TDengine service DID NOT achieve READY status: pid={}".format(subProc.getPid())) def _verifyDnode(self, tInst: TdeInstance): dbc = DbConn.createNative(tInst.getDbTarget()) @@ -722,29 +775,23 @@ class ServiceManagerThread: break if not isValid: print("Failed to start dnode, sleep for a while") - time.sleep(600) + time.sleep(10.0) raise RuntimeError("Failed to start Dnode, expected port not found: {}". format(tInst.getPort())) dbc.close() def stop(self): # can be called from both main thread or signal handler - Logging.info("Terminating TDengine service running as the sub process...") - if self.getStatus().isStopped(): - Logging.info("Service already stopped") - return - if self.getStatus().isStopping(): - Logging.info("Service is already being stopped, pid: {}".format(self._tdeSubProcess.getPid())) - return + # Linux will send Control-C generated SIGINT to the TDengine process # already, ref: # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes - if not self._tdeSubProcess: - raise RuntimeError("sub process object missing") + # if not self._tdeSubProcess: + # raise RuntimeError("sub process object missing") - self._status.set(Status.STATUS_STOPPING) - TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what - self._tdeSubProcess = None + # self._status.set(Status.STATUS_STOPPING) + # TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what + # self._tdeSubProcess = None # if not self._tdeSubProcess.stop(): # everything withing # if self._tdeSubProcess.isRunning(): # still running, should now never happen # Logging.error("FAILED to stop sub process, it is still running... pid = {}".format( @@ -757,29 +804,28 @@ class ServiceManagerThread: outputLines = 10 # for last output if self.getStatus().isStopped(): self.procIpcBatch(outputLines) # one last time - Logging.debug("End of TDengine Service Output: {}".format(self)) + Logging.debug("End of TDengine Service Output") Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n") else: - print("WARNING: SMT did not terminate as expected: {}".format(self)) + print("WARNING: SMT did not terminate as expected") def join(self): # TODO: sanity check - if not self.getStatus().isStopping(): + s = self.getStatus() + if s.isStopping() or s.isStopped(): # we may be stopping ourselves, or have been stopped/killed by others + if self._thread or self._thread2 : + if self._thread: + self._thread.join() + self._thread = None + if self._thread2: # STD ERR thread + self._thread2.join() + self._thread2 = None + else: + Logging.warning("Joining empty thread, doing nothing") + else: raise RuntimeError( "SMT.Join(): Unexpected status: {}".format(self._status)) - if self._thread or self._thread2 : - if self._thread: - self._thread.join() - self._thread = None - if self._thread2: # STD ERR thread - self._thread2.join() - self._thread2 = None - else: - print("Joining empty thread, doing nothing") - - self._status.set(Status.STATUS_STOPPED) - def _trimQueue(self, targetSize): if targetSize <= 0: return # do nothing @@ -798,6 +844,10 @@ class ServiceManagerThread: TD_READY_MSG = "TDengine is initialized successfully" def procIpcBatch(self, trimToTarget=0, forceOutput=False): + ''' + Process a batch of STDOUT/STDERR data, until we read EMPTY from + the pipe. + ''' self._trimQueue(trimToTarget) # trim if necessary # Process all the output generated by the underlying sub process, # managed by IO thread @@ -887,7 +937,8 @@ class ServiceManagerThread: # queue.put(line) # meaning sub process must have died - Logging.info("EOF for TDengine STDOUT: {}".format(self)) + Logging.info("EOF found TDengine STDOUT, marking the process as terminated") + self.setStatus(Status.STATUS_STOPPED) out.close() # Close the stream fOut.close() # Close the output file @@ -898,6 +949,6 @@ class ServiceManagerThread: for line in iter(err.readline, b''): fErr.write(line) Logging.info("TDengine STDERR: {}".format(line)) - Logging.info("EOF for TDengine STDERR: {}".format(self)) + Logging.info("EOF for TDengine STDERR") err.close() fErr.close() \ No newline at end of file diff --git a/tests/pytest/crash_gen/settings.py b/tests/pytest/crash_gen/settings.py index ae0132378d..82214930b4 100644 --- a/tests/pytest/crash_gen/settings.py +++ b/tests/pytest/crash_gen/settings.py @@ -1,15 +1,29 @@ from __future__ import annotations import argparse +from typing import Optional -gConfig: argparse.Namespace +from crash_gen.misc import CrashGenError + +# gConfig: Optional[argparse.Namespace] class Settings: + _config = None # type Optional[argparse.Namespace] + @classmethod def init(cls): - global gConfig - gConfig = [] + cls._config = None + + @classmethod + def setConfig(cls, config: argparse.Namespace): + cls._config = config + + @classmethod + # TODO: check items instead of exposing everything + def getConfig(cls) -> argparse.Namespace: + if cls._config is None: + raise CrashGenError("invalid state") + return cls._config @classmethod - def setConfig(cls, config): - global gConfig - gConfig = config \ No newline at end of file + def clearConfig(cls): + cls._config = None \ No newline at end of file diff --git a/tests/pytest/crash_gen/types.py b/tests/pytest/crash_gen/types.py new file mode 100644 index 0000000000..fdf8ea2478 --- /dev/null +++ b/tests/pytest/crash_gen/types.py @@ -0,0 +1,5 @@ +from typing import Any, List, NewType + +DirPath = NewType('DirPath', str) + +QueryResult = NewType('QueryResult', List[List[Any]]) \ No newline at end of file -- GitLab