diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index bd2038ca3abfbd9c34f8327e601bcae705a4d7e0..9d3c66b97f12aea7e8685a923f26599d378ac8a5 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -15,6 +15,8 @@ from __future__ import annotations # For type hinting before definition, ref: h import sys import os +import io +import signal import traceback # Require Python 3 if sys.version_info[0] < 3: @@ -36,6 +38,8 @@ from requests.auth import HTTPBasicAuth from typing import List from typing import Dict from typing import Set +from typing import IO +from queue import Queue, Empty from util.log import * from util.dnodes import * @@ -205,6 +209,7 @@ class WorkerThread: # else: # return self._tc.getDbState().getDbConn().query(sql) +# The coordinator of all worker threads, mostly running in main thread class ThreadCoordinator: def __init__(self, pool: ThreadPool, dbManager): self._curStep = -1 # first step is 0 @@ -217,6 +222,7 @@ class ThreadCoordinator: self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads self._execStats = ExecutionStats() + self._runStatus = MainExec.STATUS_RUNNING def getTaskExecutor(self): return self._te @@ -227,6 +233,10 @@ class ThreadCoordinator: def crossStepBarrier(self): self._stepBarrier.wait() + def requestToStop(self): + self._runStatus = MainExec.STATUS_STOPPING + self._execStats.registerFailure("User Interruption") + def run(self): self._pool.createAndStartThreads(self) @@ -234,41 +244,56 @@ class ThreadCoordinator: self._curStep = -1 # not started yet maxSteps = gConfig.max_steps # type: ignore self._execStats.startExec() # start the stop watch - failed = False - while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 + transitionFailed = False + hasAbortedTask = False + while(self._curStep < maxSteps-1 and + (not transitionFailed) and + (self._runStatus==MainExec.STATUS_RUNNING) and + (not hasAbortedTask)): # maxStep==10, last curStep should be 9 + if not gConfig.debug: print(".", end="", flush=True) # print this only if we are not in debug mode logger.debug("[TRD] Main thread going to sleep") - # Now ready to enter a step + # Now main thread (that's us) is ready to enter a step self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate self._stepBarrier.reset() # Other worker threads should now be at the "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" - try: - sm = self._dbManager.getStateMachine() - logger.debug("[STT] starting transitions") - sm.transition(self._executedTasks) # at end of step, transiton the DB state - logger.debug("[STT] transition ended") - # Due to limitation (or maybe not) of the Python library, we cannot share connections across threads - if sm.hasDatabase() : - for t in self._pool.threadList: - logger.debug("[DB] use db for all worker threads") - t.useDb() - # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread - - except taos.error.ProgrammingError as err: - if ( err.msg == 'network unavailable' ): # broken DB connection - logger.info("DB connection broken, execution failed") - traceback.print_stack() - failed = True - self._te = None # Not running any more - self._execStats.registerFailure("Broken DB Connection") - # continue # don't do that, need to tap all threads at end, and maybe signal them to stop - else: - raise - finally: - pass + # We use this period to do house keeping work, when all worker threads are QUIET. + hasAbortedTask = False + for task in self._executedTasks : + if task.isAborted() : + print("Task aborted: {}".format(task)) + hasAbortedTask = True + break + + if hasAbortedTask : # do transition only if tasks are error free + self._execStats.registerFailure("Aborted Task Encountered") + else: + try: + sm = self._dbManager.getStateMachine() + logger.debug("[STT] starting transitions") + sm.transition(self._executedTasks) # at end of step, transiton the DB state + logger.debug("[STT] transition ended") + # Due to limitation (or maybe not) of the Python library, we cannot share connections across threads + if sm.hasDatabase() : + for t in self._pool.threadList: + logger.debug("[DB] use db for all worker threads") + t.useDb() + # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread + except taos.error.ProgrammingError as err: + if ( err.msg == 'network unavailable' ): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + transitionFailed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at end, and maybe signal them to stop + else: + raise + # finally: + # pass self.resetExecutedTasks() # clear the tasks after we are done @@ -278,14 +303,14 @@ class ThreadCoordinator: logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep # A new TE for the new step - if not failed: # only if not failed + if not transitionFailed: # only if not failed self._te = TaskExecutor(self._curStep) logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep - self.tapAllThreads() + self.tapAllThreads() # Worker threads will wake up at this point, and each execute it's own task logger.debug("Main thread ready to finish up...") - if not failed: # only in regular situations + if not transitionFailed: # only in regular situations self.crossStepBarrier() # Cross it one last time, after all threads finish self._stepBarrier.reset() logger.debug("Main thread in exclusive zone...") @@ -298,8 +323,8 @@ class ThreadCoordinator: logger.info("\nAll worker threads finished") self._execStats.endExec() - def logStats(self): - self._execStats.logStats() + def printStats(self): + self._execStats.printStats() def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -1061,15 +1086,60 @@ class DbManager(): self._dbConn.close() class TaskExecutor(): + class BoundedList: + def __init__(self, size = 10): + self._size = size + self._list = [] + + def add(self, n: int) : + if not self._list: # empty + self._list.append(n) + return + # now we should insert + nItems = len(self._list) + insPos = 0 + for i in range(nItems): + insPos = i + if n <= self._list[i] : # smaller than this item, time to insert + break # found the insertion point + insPos += 1 # insert to the right + + if insPos == 0 : # except for the 1st item, # TODO: elimiate first item as gating item + return # do nothing + + # print("Inserting at postion {}, value: {}".format(insPos, n)) + self._list.insert(insPos, n) # insert + + newLen = len(self._list) + if newLen <= self._size : + return # do nothing + elif newLen == (self._size + 1) : + del self._list[0] # remove the first item + else : + raise RuntimeError("Corrupt Bounded List") + + def __str__(self): + return repr(self._list) + + _boundedList = BoundedList() + def __init__(self, curStep): self._curStep = curStep + @classmethod + def getBoundedList(cls): + return cls._boundedList + def getCurStep(self): return self._curStep def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread task.execute(wt) + def recordDataMark(self, n: int): + # print("[{}]".format(n), end="", flush=True) + self._boundedList.add(n) + # def logInfo(self, msg): # logger.info(" T[{}.x]: ".format(self._curStep) + msg) @@ -1089,6 +1159,7 @@ class Task(): self._dbManager = dbManager self._workerThread = None self._err = None + self._aborted = False self._curStep = None self._numRows = None # Number of rows affected @@ -1102,6 +1173,9 @@ class Task(): def isSuccess(self): return self._err == None + def isAborted(self): + return self._aborted + def clone(self): # TODO: why do we need this again? newTask = self.__class__(self._dbManager, self._execStats) return newTask @@ -1143,7 +1217,9 @@ class Task(): else: # non-debug print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) + "----------------------------\n") - sys.exit(-1) + # sys.exit(-1) + self._err = err + self._aborted = True except: self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql)) raise @@ -1213,7 +1289,7 @@ class ExecutionStats: self._failed = True self._failureReason = reason - def logStats(self): + def printStats(self): logger.info("----------------------------------------------------------------------") logger.info("| Crash_Gen test {}, with the following stats:". format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED")) @@ -1228,6 +1304,7 @@ class ExecutionStats: logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime)) logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny)) logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime)) + logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList())) logger.info("----------------------------------------------------------------------") @@ -1449,6 +1526,8 @@ class TaskAddData(StateTransitionTask): ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), nextInt) self.execWtSql(wt, sql) + # Successfully wrote the data into the DB, let's record it somehow + te.recordDataMark(nextInt) if gConfig.record_ops: self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.flush() @@ -1528,23 +1607,152 @@ class MyLoggingAdapter(logging.LoggerAdapter): return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs -class MainExec: - @classmethod - def runClient(cls): - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! +class SvcManager: + + def __init__(self): + print("Starting service manager") + signal.signal(signal.SIGTERM, self.sigIntHandler) + signal.signal(signal.SIGINT, self.sigIntHandler) + self.ioThread = None + self.subProcess = None + self.shouldStop = False + self.status = MainExec.STATUS_RUNNING + + def svcOutputReader(self, out: IO, queue): + # print("This is the svcOutput Reader...") + for line in out : # iter(out.readline, b''): + # print("Finished reading a line: {}".format(line)) + queue.put(line.rstrip()) # get rid of new lines + print("No more output from incoming IO") # meaning sub process must have died + out.close() + + def sigIntHandler(self, signalNumber, frame): + if self.status != MainExec.STATUS_RUNNING : + print("Ignoring repeated SIGINT...") + return # do nothing if it's already not running + self.status = MainExec.STATUS_STOPPING # immediately set our status + + print("Terminating program...") + self.subProcess.send_signal(signal.SIGINT) + self.shouldStop = True + self.joinIoThread() + + def joinIoThread(self): + if self.ioThread : + self.ioThread.join() + self.ioThread = None + + def run(self): + ON_POSIX = 'posix' in sys.builtin_module_names + svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg'] + # svcCmd = ['vmstat', '1'] + self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True) + q = Queue() + self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q)) + self.ioThread.daemon = True # thread dies with the program + self.ioThread.start() + + # proc = subprocess.Popen(['echo', '"to stdout"'], + # stdout=subprocess.PIPE, + # ) + # stdout_value = proc.communicate()[0] + # print('\tstdout: {}'.format(repr(stdout_value))) + + while True : + try: + line = q.get_nowait() # getting output at fast speed + except Empty: + # print('no output yet') + time.sleep(2.3) # wait only if there's no output + else: # got line + print(line) + # print("----end of iteration----") + if self.shouldStop: + print("Ending main Svc thread") + break + + print("end of loop") + + self.joinIoThread() + print("Finished") + +class ClientManager: + def __init__(self): + print("Starting service manager") + signal.signal(signal.SIGTERM, self.sigIntHandler) + signal.signal(signal.SIGINT, self.sigIntHandler) + + self.status = MainExec.STATUS_RUNNING + self.tc = None + + def sigIntHandler(self, signalNumber, frame): + if self.status != MainExec.STATUS_RUNNING : + print("Ignoring repeated SIGINT...") + return # do nothing if it's already not running + self.status = MainExec.STATUS_STOPPING # immediately set our status + + print("Terminating program...") + self.tc.requestToStop() + + def _printLastNumbers(self): # to verify data durability + dbManager = DbManager(resetDb=False) + dbc = dbManager.getDbConn() + if dbc.query("show databases") == 0 : # no databae + return + + dbc.execute("use db") + sTbName = dbManager.getFixedSuperTableName() + + # get all regular tables + dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + rTables = dbc.getQueryResult() + + bList = TaskExecutor.BoundedList() + for rTbName in rTables : # regular tables + dbc.query("select speed from db.{}".format(rTbName[0])) + numbers = dbc.getQueryResult() + for row in numbers : + # print("<{}>".format(n), end="", flush=True) + bList.add(row[0]) + + print("Top numbers in DB right now: {}".format(bList)) + print("TDengine client execution is about to start in 2 seconds...") + time.sleep(2.0) + dbManager = None # release? + + def prepare(self): + self._printLastNumbers() + + def run(self): + self._printLastNumbers() + dbManager = DbManager() # Regular function Dice.seed(0) # initial seeding of dice thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) - tc = ThreadCoordinator(thPool, dbManager) + self.tc = ThreadCoordinator(thPool, dbManager) - tc.run() - tc.logStats() - dbManager.cleanUp() + self.tc.run() + self.conclude() + + def conclude(self): + self.tc.printStats() + self.tc.getDbManager().cleanUp() + + +class MainExec: + STATUS_RUNNING = 1 + STATUS_STOPPING = 2 + # STATUS_STOPPED = 3 # Not used yet + + @classmethod + def runClient(cls): + clientManager = ClientManager() + clientManager.run() @classmethod def runService(cls): - print("Running service...") + svcManager = SvcManager() + svcManager.run() @classmethod def runTemp(cls): # for debugging purposes