From d8206d228f16913ced16217c75006c42fa646279 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 14 Jul 2020 21:46:01 +0000 Subject: [PATCH] refactoring thread synchronization for crash_gen tool --- tests/pytest/crash_gen.py | 201 ++++++++++++++++++++------------------ 1 file changed, 108 insertions(+), 93 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 07a745efa0..4a014d9705 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -130,22 +130,15 @@ class WorkerThread: while True: tc = self._tc # Thread Coordinator, the overall master tc.crossStepBarrier() # shared barrier first, INCLUDING the last one - logger.debug( - "[TRD] Worker thread [{}] exited barrier...".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped - logger.debug( - "[TRD] Worker thread [{}] exited step gate...".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) if not self._tc.isRunning(): - logger.debug( - "[TRD] Thread Coordinator not running any more, worker thread now stopping...") + logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break # Fetch a task from the Thread Coordinator - logger.debug( - "[TRD] Worker thread [{}] about to fetch task".format( - self._tid)) + logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() # Execute such a task @@ -154,9 +147,7 @@ class WorkerThread: self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) - logger.debug( - "[TRD] Worker thread [{}] finished executing task".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) self._dbInUse = False # there may be changes between steps @@ -255,101 +246,124 @@ class ThreadCoordinator: self._runStatus = MainExec.STATUS_STOPPING self._execStats.registerFailure("User Interruption") + def _runShouldEnd(self, transitionFailed, hasAbortedTask): + maxSteps = gConfig.max_steps # type: ignore + if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 + return True + if self._runStatus != MainExec.STATUS_RUNNING: + return True + if transitionFailed: + return True + if hasAbortedTask: + return True + return False + + def _hasAbortedTask(self): # from execution of previous step + for task in self._executedTasks: + if task.isAborted(): + # print("Task aborted: {}".format(task)) + # hasAbortedTask = True + return True + return False + + def _releaseAllWorkerThreads(self, transitionFailed): + self._curStep += 1 # we are about to get into next step. TODO: race condition here! + # Now not all threads had time to go to sleep + logger.debug( + "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) + + # A new TE for the new step + self._te = None # set to empty first, to signal worker thread to stop + if not transitionFailed: # only if not failed + self._te = TaskExecutor(self._curStep) + + logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format( + self._curStep)) # Now not all threads had time to go to sleep + # Worker threads will wake up at this point, and each execute it's own task + self.tapAllThreads() # release all worker thread from their "gate" + + def _syncAtBarrier(self): + # Now main thread (that's us) is ready to enter a step + # let other threads go past the pool barrier, but wait at the + # thread gate + logger.debug("[TRD] Main thread about to cross the barrier") + self.crossStepBarrier() + self._stepBarrier.reset() # Other worker threads should now be at the "gate" + logger.debug("[TRD] Main thread finished crossing the barrier") + + def _doTransition(self): + transitionFailed = False + try: + sm = self._dbManager.getStateMachine() + logger.debug("[STT] starting transitions") + # at end of step, transiton the DB state + sm.transition(self._executedTasks) + logger.debug("[STT] transition ended") + # Due to limitation (or maybe not) of the Python library, + # we cannot share connections across threads + if sm.hasDatabase(): + for t in self._pool.threadList: + logger.debug("[DB] use db for all worker threads") + t.useDb() + # t.execSql("use db") # main thread executing "use + # db" on behalf of every worker thread + except taos.error.ProgrammingError as err: + if (err.msg == 'network unavailable'): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + transitionFailed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at + # end, and maybe signal them to stop + else: + raise + + self.resetExecutedTasks() # clear the tasks after we are done + # Get ready for next step + logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed)) + return transitionFailed + def run(self): self._pool.createAndStartThreads(self) # Coordinate all threads step by step self._curStep = -1 # not started yet - maxSteps = gConfig.max_steps # type: ignore + self._execStats.startExec() # start the stop watch transitionFailed = False hasAbortedTask = False - while(self._curStep < maxSteps - 1 and - (not transitionFailed) and - (self._runStatus == MainExec.STATUS_RUNNING) and - (not hasAbortedTask)): # maxStep==10, last curStep should be 9 - - if not gConfig.debug: - # print this only if we are not in debug mode + while not self._runShouldEnd(transitionFailed, hasAbortedTask): + if not gConfig.debug: # print this only if we are not in debug mode print(".", end="", flush=True) - logger.debug("[TRD] Main thread going to sleep") - - # Now main thread (that's us) is ready to enter a step - # let other threads go past the pool barrier, but wait at the - # thread gate - self.crossStepBarrier() - self._stepBarrier.reset() # Other worker threads should now be at the "gate" + + self._syncAtBarrier() # For now just cross the barrier # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # We use this period to do house keeping work, when all worker # threads are QUIET. - hasAbortedTask = False - for task in self._executedTasks: - if task.isAborted(): - print("Task aborted: {}".format(task)) - hasAbortedTask = True - break - - if hasAbortedTask: # do transition only if tasks are error free + hasAbortedTask = self._hasAbortedTask() # from previous step + if hasAbortedTask: + logger.info("Aborted task encountered, exiting test program") self._execStats.registerFailure("Aborted Task Encountered") - else: - try: - sm = self._dbManager.getStateMachine() - logger.debug("[STT] starting transitions") - # at end of step, transiton the DB state - sm.transition(self._executedTasks) - logger.debug("[STT] transition ended") - # Due to limitation (or maybe not) of the Python library, - # we cannot share connections across threads - if sm.hasDatabase(): - for t in self._pool.threadList: - logger.debug("[DB] use db for all worker threads") - t.useDb() - # t.execSql("use db") # main thread executing "use - # db" on behalf of every worker thread - except taos.error.ProgrammingError as err: - if (err.msg == 'network unavailable'): # broken DB connection - logger.info("DB connection broken, execution failed") - traceback.print_stack() - transitionFailed = True - self._te = None # Not running any more - self._execStats.registerFailure("Broken DB Connection") - # continue # don't do that, need to tap all threads at - # end, and maybe signal them to stop - else: - raise - # finally: - # pass + break # do transition only if tasks are error free - self.resetExecutedTasks() # clear the tasks after we are done + # Ending previous step + transitionFailed = self._doTransition() # To start, we end step -1 first + # Then we move on to the next step + self._releaseAllWorkerThreads(transitionFailed) - # Get ready for next step - logger.debug("<-- Step {} finished".format(self._curStep)) - self._curStep += 1 # we are about to get into next step. TODO: race condition here! - # Now not all threads had time to go to sleep - logger.debug( - "\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) + if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" + logger.debug("Abnormal ending of main thraed") + else: # regular ending, workers waiting at "barrier" + logger.debug("Regular ending, main thread waiting for all worker threads to stop...") + self._syncAtBarrier() - # A new TE for the new step - if not transitionFailed: # only if not failed - self._te = TaskExecutor(self._curStep) - - logger.debug( - "[TRD] Main thread waking up at step {}, tapping worker threads".format( - self._curStep)) # Now not all threads had time to go to sleep - # Worker threads will wake up at this point, and each execute it's - # own task - self.tapAllThreads() - - logger.debug("Main thread ready to finish up...") - if not transitionFailed: # only in regular situations - self.crossStepBarrier() # Cross it one last time, after all threads finish - self._stepBarrier.reset() - logger.debug("Main thread in exclusive zone...") - self._te = None # No more executor, time to end - logger.debug("Main thread tapping all threads one last time...") - self.tapAllThreads() # Let the threads run one last time + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("\r\n\n--> Main thread ready to finish up...") logger.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish logger.info("\nAll worker threads finished") @@ -2258,8 +2272,9 @@ class ClientManager: def sigIntHandler(self, signalNumber, frame): if self._status != MainExec.STATUS_RUNNING: - print("Ignoring repeated SIGINT...") - return # do nothing if it's already not running + print("Repeated SIGINT received, forced exit...") + # return # do nothing if it's already not running + sys.exit(-1) self._status = MainExec.STATUS_STOPPING # immediately set our status print("Terminating program...") -- GitLab