diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 49c428b7f108b785f564564915aa2f8dbf52127e..afe61128a5b216f06a26d09ddd965f1ca99922c1 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -1629,73 +1629,204 @@ class MyLoggingAdapter(logging.LoggerAdapter): # return '[%s] %s' % (self.extra['connid'], msg), kwargs class SvcManager: - + MAX_QUEUE_SIZE = 30 + def __init__(self): print("Starting service manager") signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler) + signal.signal(signal.SIGUSR1, self.sigUsrHandler) self.ioThread = None self.subProcess = None self.shouldStop = False - self.status = MainExec.STATUS_RUNNING + # self.status = MainExec.STATUS_RUNNING # set inside _startTaosService() def svcOutputReader(self, out: IO, queue): - # print("This is the svcOutput Reader...") - for line in out : # iter(out.readline, b''): + # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python + print("This is the svcOutput Reader...") + # for line in out : + for line in iter(out.readline, b''): # print("Finished reading a line: {}".format(line)) - queue.put(line.rstrip()) # get rid of new lines - print("No more output from incoming IO") # meaning sub process must have died + # print("Adding item to queue...") + line = line.decode("utf-8").rstrip() + queue.put(line) # This might block, and then causing "out" buffer to block + print("_i", end="", flush=True) + + # Trim the queue if necessary + oneTenthQSize = self.MAX_QUEUE_SIZE // 10 + if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize) ) : # 90% full? + print("Triming IPC queue by: {}".format(oneTenthQSize)) + for i in range(0, oneTenthQSize) : + try: + queue.get_nowait() + except Empty: + break # break out of for loop, no more trimming + + if self.shouldStop : + print("Stopping to read output from sub process") + break + + # queue.put(line) + print("No more output (most likely) from IO thread managing TDengine service") # meaning sub process must have died out.close() - def sigIntHandler(self, signalNumber, frame): + def _doMenu(self): + choice = "" + while True: + print("\nInterrupting Service Program, Choose an Action: ") + print("1: Resume") + print("2: Terminate") + print("3: Restart") + # Remember to update the if range below + # print("Enter Choice: ", end="", flush=True) + while choice == "": + choice = input("Enter Choice: ") + if choice != "": + break # done with reading repeated input + if choice in ["1", "2", "3"]: + break # we are done with whole method + print("Invalid choice, please try again.") + choice = "" # reset + return choice + + def sigUsrHandler(self, signalNumber, frame) : + print("Interrupting main thread execution upon SIGUSR1") if self.status != MainExec.STATUS_RUNNING : - print("Ignoring repeated SIGINT...") + print("Ignoring repeated SIG...") return # do nothing if it's already not running - self.status = MainExec.STATUS_STOPPING # immediately set our status + self.status = MainExec.STATUS_STOPPING + + choice = self._doMenu() + if choice == "1" : + self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? + elif choice == "2" : + self._stopTaosService() + elif choice == "3" : + self._stopTaosService() + self._startTaosService() + else: + raise RuntimeError("Invalid menu choice: {}".format(choice)) - print("Terminating program...") - self.subProcess.send_signal(signal.SIGINT) - self.shouldStop = True - self.joinIoThread() + def sigIntHandler(self, signalNumber, frame): + print("Sig INT Handler starting...") + if self.status != MainExec.STATUS_RUNNING : + print("Ignoring repeated SIG_INT...") + return + + self.status = MainExec.STATUS_STOPPING # immediately set our status + self._stopTaosService() + print("INT signal handler returning...") + + def sigHandlerResume(self) : + print("Resuming TDengine service manager thread (main thread)...\n\n") + self.status = MainExec.STATUS_RUNNING def joinIoThread(self): if self.ioThread : self.ioThread.join() - self.ioThread = None + self.ioThread = None + else : + print("Joining empty thread, doing nothing") + + def _procIpcBatch(self): + # Process all the output generated by the underlying sub process, managed by IO thread + while True : + try: + line = self.ipcQueue.get_nowait() # getting output at fast speed + print("_o", end="", flush=True) + except Empty: + # time.sleep(2.3) # wait only if there's no output + # no more output + return # we are done with THIS BATCH + else: # got line + print(line) - def run(self): + def _procIpcAll(self): + while True : + print("<<", end="", flush=True) + self._procIpcBatch() # process one batch + + # check if the ioThread is still running + if (not self.ioThread) or (not self.ioThread.is_alive()): + print("IO Thread (with subprocess) has ended, main thread now exiting...") + self._stopTaosService() + self._procIpcBatch() # one more batch + return # TODO: maybe one last batch? + + # Maybe handler says we should exit now + if self.shouldStop: + print("Main thread ending all IPC processing with IOThread/SubProcess") + self._procIpcBatch() # one more batch + return + + print(">>", end="", flush=True) + time.sleep(0.5) + + def _startTaosService(self): ON_POSIX = 'posix' in sys.builtin_module_names svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg'] # svcCmd = ['vmstat', '1'] - self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True) - q = Queue() - self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q)) + if self.subProcess : # already there + raise RuntimeError("Corrupt process state") + + self.subProcess = subprocess.Popen( + svcCmd, + stdout=subprocess.PIPE, + # bufsize=1, # not supported in binary mode + close_fds=ON_POSIX) # had text=True, which interferred with reading EOF + self.ipcQueue = Queue() + + if self.ioThread : + raise RuntimeError("Corrupt thread state") + self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, self.ipcQueue)) self.ioThread.daemon = True # thread dies with the program self.ioThread.start() + self.shouldStop = False # don't let the main loop stop + self.status = MainExec.STATUS_RUNNING + + def _stopTaosService(self): + # can be called from both main thread or signal handler + print("Terminating TDengine service running as the sub process...") + # Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes + if not self.subProcess : + print("Process already stopped") + return + + retCode = self.subProcess.poll() + if retCode : # valid return code, process ended + self.subProcess = None + else: # process still alive, let's interrupt it + print("Sub process still running, sending SIG_INT and waiting for it to stop...") + self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end + try : + self.subProcess.wait(10) + except subprocess.TimeoutExpired as err: + print("Time out waiting for TDengine service process to exit") + else: + print("Sub process has ended") + self.subProcess = None + + if self.subProcess and (not self.subProcess.poll()): + print("Sub process is still running... pid = {}".format(self.subProcess.pid)) + + self.shouldStop = True + self.joinIoThread() + + def run(self): + self._startTaosService() + # proc = subprocess.Popen(['echo', '"to stdout"'], # stdout=subprocess.PIPE, # ) # stdout_value = proc.communicate()[0] # print('\tstdout: {}'.format(repr(stdout_value))) - while True : - try: - line = q.get_nowait() # getting output at fast speed - except Empty: - # print('no output yet') - time.sleep(2.3) # wait only if there's no output - else: # got line - print(line) - # print("----end of iteration----") - if self.shouldStop: - print("Ending main Svc thread") - break + self._procIpcAll() - print("end of loop") - - self.joinIoThread() - print("Finished") + print("End of loop reading from IPC queue") + self.joinIoThread() # should have started already + print("SvcManager Run Finished") class ClientManager: def __init__(self):