diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index dfe78b6048f6d3c13bdabc6b0b9a394e50b3d7c4..f43e746a710dace0f145ac82409dd8f5fff03565 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -42,6 +42,13 @@ import os import io import signal import traceback + +try: + import psutil +except: + print("Psutil module needed, please install: sudo pip3 install psutil") + sys.exit(-1) + # Require Python 3 if sys.version_info[0] < 3: raise Exception("Must be using Python 3") @@ -69,8 +76,7 @@ class CrashGenError(Exception): class WorkerThread: - def __init__(self, pool: ThreadPool, tid, - tc: ThreadCoordinator, + def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, # te: TaskExecutor, ): # note: main thread context! # self._curStep = -1 @@ -138,7 +144,12 @@ class WorkerThread: # tc = ThreadCoordinator(None) while True: tc = self._tc # Thread Coordinator, the overall master - tc.crossStepBarrier() # shared barrier first, INCLUDING the last one + try: + tc.crossStepBarrier() # shared barrier first, INCLUDING the last one + except threading.BrokenBarrierError as err: # main thread timed out + logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out") + break + logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) @@ -248,14 +259,14 @@ class ThreadCoordinator: def getDbManager(self) -> DbManager: return self._dbManager - def crossStepBarrier(self): - self._stepBarrier.wait() + def crossStepBarrier(self, timeout=None): + self._stepBarrier.wait(timeout) def requestToStop(self): self._runStatus = MainExec.STATUS_STOPPING self._execStats.registerFailure("User Interruption") - def _runShouldEnd(self, transitionFailed, hasAbortedTask): + def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): maxSteps = gConfig.max_steps # type: ignore if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 return True @@ -265,6 +276,8 @@ class ThreadCoordinator: return True if hasAbortedTask: return True + if workerTimeout: + return True return False def _hasAbortedTask(self): # from execution of previous step @@ -296,7 +309,7 @@ class ThreadCoordinator: # let other threads go past the pool barrier, but wait at the # thread gate logger.debug("[TRD] Main thread about to cross the barrier") - self.crossStepBarrier() + self.crossStepBarrier(timeout=15) self._stepBarrier.reset() # Other worker threads should now be at the "gate" logger.debug("[TRD] Main thread finished crossing the barrier") @@ -342,11 +355,21 @@ class ThreadCoordinator: self._execStats.startExec() # start the stop watch transitionFailed = False hasAbortedTask = False - while not self._runShouldEnd(transitionFailed, hasAbortedTask): + workerTimeout = False + while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): if not gConfig.debug: # print this only if we are not in debug mode print(".", end="", flush=True) - self._syncAtBarrier() # For now just cross the barrier + try: + self._syncAtBarrier() # For now just cross the barrier + except threading.BrokenBarrierError as err: + logger.info("Main loop aborted, caused by worker thread time-out") + self._execStats.registerFailure("Aborted due to worker thread timeout") + print("\n\nWorker Thread time-out detected, important thread info:") + ts = ThreadStacks() + ts.print(filterInternal=True) + workerTimeout = True + break # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # We use this period to do house keeping work, when all worker @@ -364,6 +387,8 @@ class ThreadCoordinator: if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" logger.debug("Abnormal ending of main thraed") + elif workerTimeout: + logger.debug("Abnormal ending of main thread, due to worker timeout") else: # regular ending, workers waiting at "barrier" logger.debug("Regular ending, main thread waiting for all worker threads to stop...") self._syncAtBarrier() @@ -569,9 +594,7 @@ class DbConn: # below implemented by child classes self.openByType() - logger.debug( - "[DB] data connection opened, type = {}".format( - self._type)) + logger.debug("[DB] data connection opened, type = {}".format(self._type)) self.isOpen = True def resetDb(self): # reset the whole database, etc. @@ -786,9 +809,7 @@ class DbConnNative(DbConn): self.__class__._connInfoDisplayed = True # updating CLASS variable logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) - self._conn = taos.connect( - host=hostAddr, - config=cfgPath) # TODO: make configurable + self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable self._cursor = self._conn.cursor() self._cursor.execute('reset query cache') @@ -1674,7 +1695,7 @@ class StateTransitionTask(Task): @classmethod def getRegTableName(cls, i): - return "db.reg_table_{}".format(i) + return "reg_table_{}".format(i) def execute(self, wt: WorkerThread): super().execute(wt) @@ -1984,9 +2005,9 @@ class MyLoggingAdapter(logging.LoggerAdapter): class SvcManager: def __init__(self): print("Starting TDengine Service Manager") - signal.signal(signal.SIGTERM, self.sigIntHandler) - signal.signal(signal.SIGINT, self.sigIntHandler) - signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! + # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec + # signal.signal(signal.SIGINT, self.sigIntHandler) + # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! self.inSigHandler = False # self._status = MainExec.STATUS_RUNNING # set inside @@ -2035,14 +2056,14 @@ class SvcManager: self.inSigHandler = False def sigIntHandler(self, signalNumber, frame): - print("Sig INT Handler starting...") + print("SvcManager: INT Signal Handler starting...") if self.inSigHandler: print("Ignoring repeated SIG_INT...") return self.inSigHandler = True self.stopTaosService() - print("INT signal handler returning...") + print("SvcManager: INT Signal Handler returning...") self.inSigHandler = False def sigHandlerResume(self): @@ -2064,8 +2085,16 @@ class SvcManager: def startTaosService(self): if self.svcMgrThread: - raise RuntimeError( - "Cannot start TAOS service when one may already be running") + raise RuntimeError("Cannot start TAOS service when one may already be running") + + # Find if there's already a taosd service, and then kill it + for proc in psutil.process_iter(): + if proc.name() == 'taosd': + print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe") + time.sleep(2.0) + proc.kill() + # print("Process: {}".format(proc.name())) + self.svcMgrThread = ServiceManagerThread() # create the object self.svcMgrThread.start() print("TAOS service started, printing out output...") @@ -2075,9 +2104,11 @@ class SvcManager: print("TAOS service started") def stopTaosService(self, outputLines=20): + if not self.isRunning(): + logger.warning("Cannot stop TAOS service, not running") + return + print("Terminating Service Manager Thread (SMT) execution...") - if not self.svcMgrThread: - raise RuntimeError("Unexpected empty svc mgr thread") self.svcMgrThread.stop() if self.svcMgrThread.isStopped(): self.svcMgrThread.procIpcBatch(outputLines) # one last time @@ -2090,9 +2121,11 @@ class SvcManager: def run(self): self.startTaosService() self._procIpcAll() # pump/process all the messages - if self.svcMgrThread: # if sig handler hasn't destroyed it by now + if self.isRunning(): # if sig handler hasn't destroyed it by now self.stopTaosService() # should have started already + def isRunning(self): + return self.svcMgrThread != None class ServiceManagerThread: MAX_QUEUE_SIZE = 10000 @@ -2144,6 +2177,7 @@ class ServiceManagerThread: logger.info("[] TDengine service READY to process requests") return # now we've started # TODO: handle this better? + self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output raise RuntimeError("TDengine service did not start successfully") def stop(self): @@ -2292,6 +2326,15 @@ class TdeSubProcess: taosdPath = self.getBuildPath() + "/build/bin/taosd" cfgPath = self.getBuildPath() + "/test/cfg" + # Delete the log files + logPath = self.getBuildPath() + "/test/log" + # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397 + filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ] + for f in filelist: + filePath = os.path.join(logPath, f) + print("Removing log file: {}".format(filePath)) + os.remove(filePath) + svcCmd = [taosdPath, '-c', cfgPath] # svcCmd = ['vmstat', '1'] if self.subProcess: # already there @@ -2325,16 +2368,46 @@ class TdeSubProcess: print("TDengine service process terminated successfully from SIG_INT") self.subProcess = None +class ThreadStacks: # stack info for all threads + def __init__(self): + self._allStacks = {} + allFrames = sys._current_frames() + for th in threading.enumerate(): + stack = traceback.extract_stack(allFrames[th.ident]) + self._allStacks[th.native_id] = stack + + def print(self, filteredEndName = None, filterInternal = False): + for thNid, stack in self._allStacks.items(): # for each thread + lastFrame = stack[-1] + if filteredEndName: # we need to filter out stacks that match this name + if lastFrame.name == filteredEndName : # end did not match + continue + if filterInternal: + if lastFrame.name in ['wait', 'invoke_excepthook', + '_wait', # The Barrier exception + 'svcOutputReader', # the svcMgr thread + '__init__']: # the thread that extracted the stack + continue # ignore + # Now print + print("\n<----- Thread Info for ID: {}".format(thNid)) + for frame in stack: + # print(frame) + print("File {filename}, line {lineno}, in {name}".format( + filename=frame.filename, lineno=frame.lineno, name=frame.name)) + print(" {}".format(frame.line)) + print("-----> End of Thread Info\n") class ClientManager: def __init__(self): print("Starting service manager") - signal.signal(signal.SIGTERM, self.sigIntHandler) - signal.signal(signal.SIGINT, self.sigIntHandler) + # signal.signal(signal.SIGTERM, self.sigIntHandler) + # signal.signal(signal.SIGINT, self.sigIntHandler) self._status = MainExec.STATUS_RUNNING self.tc = None + self.inSigHandler = False + def sigIntHandler(self, signalNumber, frame): if self._status != MainExec.STATUS_RUNNING: print("Repeated SIGINT received, forced exit...") @@ -2342,9 +2415,50 @@ class ClientManager: sys.exit(-1) self._status = MainExec.STATUS_STOPPING # immediately set our status - print("Terminating program...") + print("ClientManager: Terminating program...") self.tc.requestToStop() + def _doMenu(self): + choice = "" + while True: + print("\nInterrupting Client Program, Choose an Action: ") + print("1: Resume") + print("2: Terminate") + print("3: Show Threads") + # Remember to update the if range below + # print("Enter Choice: ", end="", flush=True) + while choice == "": + choice = input("Enter Choice: ") + if choice != "": + break # done with reading repeated input + if choice in ["1", "2", "3"]: + break # we are done with whole method + print("Invalid choice, please try again.") + choice = "" # reset + return choice + + def sigUsrHandler(self, signalNumber, frame): + print("Interrupting main thread execution upon SIGUSR1") + if self.inSigHandler: # already + print("Ignoring repeated SIG_USR1...") + return # do nothing if it's already not running + self.inSigHandler = True + + choice = self._doMenu() + if choice == "1": + print("Resuming execution...") + time.sleep(1.0) + elif choice == "2": + print("Not implemented yet") + time.sleep(1.0) + elif choice == "3": + ts = ThreadStacks() + ts.print() + else: + raise RuntimeError("Invalid menu choice: {}".format(choice)) + + self.inSigHandler = False + def _printLastNumbers(self): # to verify data durability dbManager = DbManager(resetDb=False) dbc = dbManager.getDbConn() @@ -2377,11 +2491,7 @@ class ClientManager: def prepare(self): self._printLastNumbers() - def run(self): - if gConfig.auto_start_service: - svcMgr = SvcManager() - svcMgr.startTaosService() - + def run(self, svcMgr): self._printLastNumbers() dbManager = DbManager() # Regular function @@ -2391,7 +2501,7 @@ class ClientManager: self.tc.run() # print("exec stats: {}".format(self.tc.getExecStats())) # print("TC failed = {}".format(self.tc.isFailed())) - if gConfig.auto_start_service: + if svcMgr: # gConfig.auto_start_service: svcMgr.stopTaosService() # Print exec status, etc., AFTER showing messages from the server self.conclude() @@ -2410,18 +2520,39 @@ class MainExec: STATUS_STOPPING = 3 STATUS_STOPPED = 4 - @classmethod - def runClient(cls): - clientManager = ClientManager() - return clientManager.run() + def __init__(self): + self._clientMgr = None + self._svcMgr = None - @classmethod - def runService(cls): - svcManager = SvcManager() - svcManager.run() + signal.signal(signal.SIGTERM, self.sigIntHandler) + signal.signal(signal.SIGINT, self.sigIntHandler) + signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! - @classmethod - def runTemp(cls): # for debugging purposes + def sigUsrHandler(self, signalNumber, frame): + if self._clientMgr: + self._clientMgr.sigUsrHandler(signalNumber, frame) + elif self._svcMgr: # Only if no client mgr, we are running alone + self._svcMgr.sigUsrHandler(signalNumber, frame) + + def sigIntHandler(self, signalNumber, frame): + if self._svcMgr: + self._svcMgr.sigIntHandler(signalNumber, frame) + if self._clientMgr: + self._clientMgr.sigIntHandler(signalNumber, frame) + + def runClient(self): + if gConfig.auto_start_service: + self._svcMgr = SvcManager() + self._svcMgr.startTaosService() # we start, don't run + + self._clientMgr = ClientManager() + ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside + + def runService(self): + self._svcMgr = SvcManager() + self._svcMgr.run() # run to some end state + + def runTemp(self): # for debugging purposes # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix # dbc = dbState.getDbConn() # sTbName = dbState.getFixedSuperTableName() @@ -2577,10 +2708,11 @@ def main(): Dice.seed(0) # initial seeding of dice # Run server or client + mExec = MainExec() if gConfig.run_tdengine: # run server - MainExec.runService() + mExec.runService() else: - return MainExec.runClient() + return mExec.runClient() if __name__ == "__main__":