提交 d8206d22 编写于 作者: S Steven Li

refactoring thread synchronization for crash_gen tool

上级 a75721ff
...@@ -130,22 +130,15 @@ class WorkerThread: ...@@ -130,22 +130,15 @@ class WorkerThread:
while True: while True:
tc = self._tc # Thread Coordinator, the overall master tc = self._tc # Thread Coordinator, the overall master
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
logger.debug( logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
"[TRD] Worker thread [{}] exited barrier...".format(
self._tid))
self.crossStepGate() # then per-thread gate, after being tapped self.crossStepGate() # then per-thread gate, after being tapped
logger.debug( logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
"[TRD] Worker thread [{}] exited step gate...".format(
self._tid))
if not self._tc.isRunning(): if not self._tc.isRunning():
logger.debug( logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
"[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
# Fetch a task from the Thread Coordinator # Fetch a task from the Thread Coordinator
logger.debug( logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
"[TRD] Worker thread [{}] about to fetch task".format(
self._tid))
task = tc.fetchTask() task = tc.fetchTask()
# Execute such a task # Execute such a task
...@@ -154,9 +147,7 @@ class WorkerThread: ...@@ -154,9 +147,7 @@ class WorkerThread:
self._tid, task.__class__.__name__)) self._tid, task.__class__.__name__))
task.execute(self) task.execute(self)
tc.saveExecutedTask(task) tc.saveExecutedTask(task)
logger.debug( logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
"[TRD] Worker thread [{}] finished executing task".format(
self._tid))
self._dbInUse = False # there may be changes between steps self._dbInUse = False # there may be changes between steps
...@@ -255,44 +246,53 @@ class ThreadCoordinator: ...@@ -255,44 +246,53 @@ class ThreadCoordinator:
self._runStatus = MainExec.STATUS_STOPPING self._runStatus = MainExec.STATUS_STOPPING
self._execStats.registerFailure("User Interruption") self._execStats.registerFailure("User Interruption")
def run(self): def _runShouldEnd(self, transitionFailed, hasAbortedTask):
self._pool.createAndStartThreads(self)
# Coordinate all threads step by step
self._curStep = -1 # not started yet
maxSteps = gConfig.max_steps # type: ignore maxSteps = gConfig.max_steps # type: ignore
self._execStats.startExec() # start the stop watch if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
transitionFailed = False return True
hasAbortedTask = False if self._runStatus != MainExec.STATUS_RUNNING:
while(self._curStep < maxSteps - 1 and return True
(not transitionFailed) and if transitionFailed:
(self._runStatus == MainExec.STATUS_RUNNING) and return True
(not hasAbortedTask)): # maxStep==10, last curStep should be 9 if hasAbortedTask:
return True
return False
if not gConfig.debug: def _hasAbortedTask(self): # from execution of previous step
# print this only if we are not in debug mode for task in self._executedTasks:
print(".", end="", flush=True) if task.isAborted():
logger.debug("[TRD] Main thread going to sleep") # print("Task aborted: {}".format(task))
# hasAbortedTask = True
return True
return False
def _releaseAllWorkerThreads(self, transitionFailed):
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
# Now not all threads had time to go to sleep
logger.debug(
"--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
# A new TE for the new step
self._te = None # set to empty first, to signal worker thread to stop
if not transitionFailed: # only if not failed
self._te = TaskExecutor(self._curStep)
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
self._curStep)) # Now not all threads had time to go to sleep
# Worker threads will wake up at this point, and each execute it's own task
self.tapAllThreads() # release all worker thread from their "gate"
def _syncAtBarrier(self):
# Now main thread (that's us) is ready to enter a step # Now main thread (that's us) is ready to enter a step
# let other threads go past the pool barrier, but wait at the # let other threads go past the pool barrier, but wait at the
# thread gate # thread gate
logger.debug("[TRD] Main thread about to cross the barrier")
self.crossStepBarrier() self.crossStepBarrier()
self._stepBarrier.reset() # Other worker threads should now be at the "gate" self._stepBarrier.reset() # Other worker threads should now be at the "gate"
logger.debug("[TRD] Main thread finished crossing the barrier")
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" def _doTransition(self):
# We use this period to do house keeping work, when all worker transitionFailed = False
# threads are QUIET.
hasAbortedTask = False
for task in self._executedTasks:
if task.isAborted():
print("Task aborted: {}".format(task))
hasAbortedTask = True
break
if hasAbortedTask: # do transition only if tasks are error free
self._execStats.registerFailure("Aborted Task Encountered")
else:
try: try:
sm = self._dbManager.getStateMachine() sm = self._dbManager.getStateMachine()
logger.debug("[STT] starting transitions") logger.debug("[STT] starting transitions")
...@@ -318,38 +318,52 @@ class ThreadCoordinator: ...@@ -318,38 +318,52 @@ class ThreadCoordinator:
# end, and maybe signal them to stop # end, and maybe signal them to stop
else: else:
raise raise
# finally:
# pass
self.resetExecutedTasks() # clear the tasks after we are done self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step # Get ready for next step
logger.debug("<-- Step {} finished".format(self._curStep)) logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
self._curStep += 1 # we are about to get into next step. TODO: race condition here! return transitionFailed
# Now not all threads had time to go to sleep
logger.debug(
"\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
# A new TE for the new step def run(self):
if not transitionFailed: # only if not failed self._pool.createAndStartThreads(self)
self._te = TaskExecutor(self._curStep)
# Coordinate all threads step by step
self._curStep = -1 # not started yet
self._execStats.startExec() # start the stop watch
transitionFailed = False
hasAbortedTask = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask):
if not gConfig.debug: # print this only if we are not in debug mode
print(".", end="", flush=True)
self._syncAtBarrier() # For now just cross the barrier
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
# We use this period to do house keeping work, when all worker
# threads are QUIET.
hasAbortedTask = self._hasAbortedTask() # from previous step
if hasAbortedTask:
logger.info("Aborted task encountered, exiting test program")
self._execStats.registerFailure("Aborted Task Encountered")
break # do transition only if tasks are error free
# Ending previous step
transitionFailed = self._doTransition() # To start, we end step -1 first
# Then we move on to the next step
self._releaseAllWorkerThreads(transitionFailed)
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
logger.debug("Abnormal ending of main thraed")
else: # regular ending, workers waiting at "barrier"
logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
self._syncAtBarrier()
logger.debug(
"[TRD] Main thread waking up at step {}, tapping worker threads".format(
self._curStep)) # Now not all threads had time to go to sleep
# Worker threads will wake up at this point, and each execute it's
# own task
self.tapAllThreads()
logger.debug("Main thread ready to finish up...")
if not transitionFailed: # only in regular situations
self.crossStepBarrier() # Cross it one last time, after all threads finish
self._stepBarrier.reset()
logger.debug("Main thread in exclusive zone...")
self._te = None # No more executor, time to end self._te = None # No more executor, time to end
logger.debug("Main thread tapping all threads one last time...") logger.debug("Main thread tapping all threads one last time...")
self.tapAllThreads() # Let the threads run one last time self.tapAllThreads() # Let the threads run one last time
logger.debug("\r\n\n--> Main thread ready to finish up...")
logger.debug("Main thread joining all threads") logger.debug("Main thread joining all threads")
self._pool.joinAll() # Get all threads to finish self._pool.joinAll() # Get all threads to finish
logger.info("\nAll worker threads finished") logger.info("\nAll worker threads finished")
...@@ -2258,8 +2272,9 @@ class ClientManager: ...@@ -2258,8 +2272,9 @@ class ClientManager:
def sigIntHandler(self, signalNumber, frame): def sigIntHandler(self, signalNumber, frame):
if self._status != MainExec.STATUS_RUNNING: if self._status != MainExec.STATUS_RUNNING:
print("Ignoring repeated SIGINT...") print("Repeated SIGINT received, forced exit...")
return # do nothing if it's already not running # return # do nothing if it's already not running
sys.exit(-1)
self._status = MainExec.STATUS_STOPPING # immediately set our status self._status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...") print("Terminating program...")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册