diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 9af72af47193c3b8b4cd7e9b2cba84d63fef9f9f..547bc85669ce818229934d649bf78d46808a7f8e 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -84,8 +84,17 @@ class WorkerThread: # Let us have a DB connection of our own if (gConfig.per_thread_db_connection): # type: ignore # print("connector_type = {}".format(gConfig.connector_type)) - self._dbConn = DbConn.createNative() if ( - gConfig.connector_type == 'native') else DbConn.createRest() + if gConfig.connector_type == 'native': + self._dbConn = DbConn.createNative() + elif gConfig.connector_type == 'rest': + self._dbConn = DbConn.createRest() + elif gConfig.connector_type == 'mixed': + if Dice.throw(2) == 0: # 1/2 chance + self._dbConn = DbConn.createNative() + else: + self._dbConn = DbConn.createRest() + else: + raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) self._dbInUse = False # if "use db" was executed already @@ -130,22 +139,15 @@ class WorkerThread: while True: tc = self._tc # Thread Coordinator, the overall master tc.crossStepBarrier() # shared barrier first, INCLUDING the last one - logger.debug( - "[TRD] Worker thread [{}] exited barrier...".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped - logger.debug( - "[TRD] Worker thread [{}] exited step gate...".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) if not self._tc.isRunning(): - logger.debug( - "[TRD] Thread Coordinator not running any more, worker thread now stopping...") + logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break # Fetch a task from the Thread Coordinator - logger.debug( - "[TRD] Worker thread [{}] about to fetch task".format( - self._tid)) + logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() # Execute such a task @@ -154,9 +156,7 @@ class WorkerThread: self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) - logger.debug( - "[TRD] Worker thread [{}] finished executing task".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) self._dbInUse = False # there may be changes between steps @@ -255,101 +255,124 @@ class ThreadCoordinator: self._runStatus = MainExec.STATUS_STOPPING self._execStats.registerFailure("User Interruption") + def _runShouldEnd(self, transitionFailed, hasAbortedTask): + maxSteps = gConfig.max_steps # type: ignore + if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 + return True + if self._runStatus != MainExec.STATUS_RUNNING: + return True + if transitionFailed: + return True + if hasAbortedTask: + return True + return False + + def _hasAbortedTask(self): # from execution of previous step + for task in self._executedTasks: + if task.isAborted(): + # print("Task aborted: {}".format(task)) + # hasAbortedTask = True + return True + return False + + def _releaseAllWorkerThreads(self, transitionFailed): + self._curStep += 1 # we are about to get into next step. TODO: race condition here! + # Now not all threads had time to go to sleep + logger.debug( + "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) + + # A new TE for the new step + self._te = None # set to empty first, to signal worker thread to stop + if not transitionFailed: # only if not failed + self._te = TaskExecutor(self._curStep) + + logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format( + self._curStep)) # Now not all threads had time to go to sleep + # Worker threads will wake up at this point, and each execute it's own task + self.tapAllThreads() # release all worker thread from their "gate" + + def _syncAtBarrier(self): + # Now main thread (that's us) is ready to enter a step + # let other threads go past the pool barrier, but wait at the + # thread gate + logger.debug("[TRD] Main thread about to cross the barrier") + self.crossStepBarrier() + self._stepBarrier.reset() # Other worker threads should now be at the "gate" + logger.debug("[TRD] Main thread finished crossing the barrier") + + def _doTransition(self): + transitionFailed = False + try: + sm = self._dbManager.getStateMachine() + logger.debug("[STT] starting transitions") + # at end of step, transiton the DB state + sm.transition(self._executedTasks) + logger.debug("[STT] transition ended") + # Due to limitation (or maybe not) of the Python library, + # we cannot share connections across threads + if sm.hasDatabase(): + for t in self._pool.threadList: + logger.debug("[DB] use db for all worker threads") + t.useDb() + # t.execSql("use db") # main thread executing "use + # db" on behalf of every worker thread + except taos.error.ProgrammingError as err: + if (err.msg == 'network unavailable'): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + transitionFailed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at + # end, and maybe signal them to stop + else: + raise + + self.resetExecutedTasks() # clear the tasks after we are done + # Get ready for next step + logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed)) + return transitionFailed + def run(self): self._pool.createAndStartThreads(self) # Coordinate all threads step by step self._curStep = -1 # not started yet - maxSteps = gConfig.max_steps # type: ignore + self._execStats.startExec() # start the stop watch transitionFailed = False hasAbortedTask = False - while(self._curStep < maxSteps - 1 and - (not transitionFailed) and - (self._runStatus == MainExec.STATUS_RUNNING) and - (not hasAbortedTask)): # maxStep==10, last curStep should be 9 - - if not gConfig.debug: - # print this only if we are not in debug mode + while not self._runShouldEnd(transitionFailed, hasAbortedTask): + if not gConfig.debug: # print this only if we are not in debug mode print(".", end="", flush=True) - logger.debug("[TRD] Main thread going to sleep") - - # Now main thread (that's us) is ready to enter a step - # let other threads go past the pool barrier, but wait at the - # thread gate - self.crossStepBarrier() - self._stepBarrier.reset() # Other worker threads should now be at the "gate" + + self._syncAtBarrier() # For now just cross the barrier # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # We use this period to do house keeping work, when all worker # threads are QUIET. - hasAbortedTask = False - for task in self._executedTasks: - if task.isAborted(): - print("Task aborted: {}".format(task)) - hasAbortedTask = True - break - - if hasAbortedTask: # do transition only if tasks are error free + hasAbortedTask = self._hasAbortedTask() # from previous step + if hasAbortedTask: + logger.info("Aborted task encountered, exiting test program") self._execStats.registerFailure("Aborted Task Encountered") - else: - try: - sm = self._dbManager.getStateMachine() - logger.debug("[STT] starting transitions") - # at end of step, transiton the DB state - sm.transition(self._executedTasks) - logger.debug("[STT] transition ended") - # Due to limitation (or maybe not) of the Python library, - # we cannot share connections across threads - if sm.hasDatabase(): - for t in self._pool.threadList: - logger.debug("[DB] use db for all worker threads") - t.useDb() - # t.execSql("use db") # main thread executing "use - # db" on behalf of every worker thread - except taos.error.ProgrammingError as err: - if (err.msg == 'network unavailable'): # broken DB connection - logger.info("DB connection broken, execution failed") - traceback.print_stack() - transitionFailed = True - self._te = None # Not running any more - self._execStats.registerFailure("Broken DB Connection") - # continue # don't do that, need to tap all threads at - # end, and maybe signal them to stop - else: - raise - # finally: - # pass + break # do transition only if tasks are error free - self.resetExecutedTasks() # clear the tasks after we are done + # Ending previous step + transitionFailed = self._doTransition() # To start, we end step -1 first + # Then we move on to the next step + self._releaseAllWorkerThreads(transitionFailed) - # Get ready for next step - logger.debug("<-- Step {} finished".format(self._curStep)) - self._curStep += 1 # we are about to get into next step. TODO: race condition here! - # Now not all threads had time to go to sleep - logger.debug( - "\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) + if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" + logger.debug("Abnormal ending of main thraed") + else: # regular ending, workers waiting at "barrier" + logger.debug("Regular ending, main thread waiting for all worker threads to stop...") + self._syncAtBarrier() - # A new TE for the new step - if not transitionFailed: # only if not failed - self._te = TaskExecutor(self._curStep) - - logger.debug( - "[TRD] Main thread waking up at step {}, tapping worker threads".format( - self._curStep)) # Now not all threads had time to go to sleep - # Worker threads will wake up at this point, and each execute it's - # own task - self.tapAllThreads() - - logger.debug("Main thread ready to finish up...") - if not transitionFailed: # only in regular situations - self.crossStepBarrier() # Cross it one last time, after all threads finish - self._stepBarrier.reset() - logger.debug("Main thread in exclusive zone...") - self._te = None # No more executor, time to end - logger.debug("Main thread tapping all threads one last time...") - self.tapAllThreads() # Let the threads run one last time + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("\r\n\n--> Main thread ready to finish up...") logger.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish logger.info("\nAll worker threads finished") @@ -514,7 +537,7 @@ class LinearQueue(): class DbConn: TYPE_NATIVE = "native-c" - TYPE_REST = "rest-api" + TYPE_REST = "rest-api" TYPE_INVALID = "invalid" @classmethod @@ -620,9 +643,13 @@ class DbConnRest(DbConn): self.isOpen = False def _doSql(self, sql): - r = requests.post(self._url, - data=sql, - auth=HTTPBasicAuth('root', 'taosdata')) + try: + r = requests.post(self._url, + data = sql, + auth = HTTPBasicAuth('root', 'taosdata')) + except: + print("REST API Failure (TODO: more info here)") + raise rj = r.json() # Sanity check for the "Json Result" if ('status' not in rj): @@ -717,7 +744,7 @@ class MyTDSql: class DbConnNative(DbConn): def __init__(self): super().__init__() - self._type = self.TYPE_REST + self._type = self.TYPE_NATIVE self._conn = None self._cursor = None @@ -2254,8 +2281,9 @@ class ClientManager: def sigIntHandler(self, signalNumber, frame): if self._status != MainExec.STATUS_RUNNING: - print("Ignoring repeated SIGINT...") - return # do nothing if it's already not running + print("Repeated SIGINT received, forced exit...") + # return # do nothing if it's already not running + sys.exit(-1) self._status = MainExec.STATUS_STOPPING # immediately set our status print("Terminating program...") @@ -2394,6 +2422,27 @@ def main(): ''')) + # parser.add_argument('-a', '--auto-start-service', action='store_true', + # help='Automatically start/stop the TDengine service (default: false)') + # parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, + # help='Connector type to use: native, rest, or mixed (default: 10)') + # parser.add_argument('-d', '--debug', action='store_true', + # help='Turn on DEBUG mode for more logging (default: false)') + # parser.add_argument('-e', '--run-tdengine', action='store_true', + # help='Run TDengine service in foreground (default: false)') + # parser.add_argument('-l', '--larger-data', action='store_true', + # help='Write larger amount of data during write operations (default: false)') + # parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + # help='Use a single shared db connection (default: false)') + # parser.add_argument('-r', '--record-ops', action='store_true', + # help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') + # parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, + # help='Maximum number of steps to run (default: 100)') + # parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, + # help='Number of threads to run (default: 10)') + # parser.add_argument('-x', '--continue-on-exception', action='store_true', + # help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)') + parser.add_argument( '-a', '--auto-start-service',