diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 95507f0142036f36f9a5249a48800bd481513844..1cd65c1dde4e9c2027a82730cf4fc12290048bbe 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -22,7 +22,7 @@ from queue import Queue, Empty from .shared.config import Config from .shared.db import DbTarget, DbConn from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice -from .shared.types import DirPath +from .shared.types import DirPath, IpcStream # from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status # from crash_gen.db import DbConn, DbTarget @@ -177,13 +177,12 @@ quorum 2 return "127.0.0.1" def getServiceCmdLine(self): # to start the instance - cmdLine = [] if Config.getConfig().track_memory_leaks: Logging.info("Invoking VALGRIND on service...") - cmdLine = ['valgrind', '--leak-check=yes'] - # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control - cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() - return cmdLine + return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()] + else: + # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control + return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() def _getDnodes(self, dbc): dbc.query("show dnodes") @@ -281,16 +280,16 @@ class TdeSubProcess: return '[TdeSubProc: pid = {}, status = {}]'.format( self.getPid(), self.getStatus() ) - def getStdOut(self) -> BinaryIO : + def getIpcStdOut(self) -> IpcStream : if self._popen.universal_newlines : # alias of text_mode raise CrashGenError("We need binary mode for STDOUT IPC") # Logging.info("Type of stdout is: {}".format(type(self._popen.stdout))) - return typing.cast(BinaryIO, self._popen.stdout) + return typing.cast(IpcStream, self._popen.stdout) - def getStdErr(self) -> BinaryIO : + def getIpcStdErr(self) -> IpcStream : if self._popen.universal_newlines : # alias of text_mode raise CrashGenError("We need binary mode for STDERR IPC") - return typing.cast(BinaryIO, self._popen.stderr) + return typing.cast(IpcStream, self._popen.stderr) # Now it's always running, since we matched the life cycle # def isRunning(self): @@ -301,11 +300,6 @@ class TdeSubProcess: def _start(self, cmdLine) -> Popen : ON_POSIX = 'posix' in sys.builtin_module_names - - # Sanity check - # if self.subProcess: # already there - # raise RuntimeError("Corrupt process state") - # Prepare environment variables for coverage information # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment @@ -314,9 +308,8 @@ class TdeSubProcess: # print(myEnv) # print("Starting TDengine with env: ", myEnv.items()) - # print("Starting TDengine via Shell: {}".format(cmdLineStr)) + print("Starting TDengine: {}".format(cmdLine)) - # useShell = True # Needed to pass environments into it return Popen( ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, shell=True, # Always use shell, since we need to pass ENV vars @@ -732,19 +725,19 @@ class ServiceManagerThread: self._ipcQueue = Queue() # type: Queue self._thread = threading.Thread( # First thread captures server OUTPUT target=self.svcOutputReader, - args=(subProc.getStdOut(), self._ipcQueue, logDir)) + args=(subProc.getIpcStdOut(), self._ipcQueue, logDir)) self._thread.daemon = True # thread dies with the program self._thread.start() time.sleep(0.01) if not self._thread.is_alive(): # What happened? - Logging.info("Failed to started process to monitor STDOUT") + Logging.info("Failed to start process to monitor STDOUT") self.stop() raise CrashGenError("Failed to start thread to monitor STDOUT") Logging.info("Successfully started process to monitor STDOUT") self._thread2 = threading.Thread( # 2nd thread captures server ERRORs target=self.svcErrorReader, - args=(subProc.getStdErr(), self._ipcQueue, logDir)) + args=(subProc.getIpcStdErr(), self._ipcQueue, logDir)) self._thread2.daemon = True # thread dies with the program self._thread2.start() time.sleep(0.01) @@ -887,14 +880,19 @@ class ServiceManagerThread: print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437'))) return None - def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str + def _textChunkGenerator(self, streamIn: IpcStream, logDir: str, logFile: str ) -> Generator[TextChunk, None, None]: ''' - Take an input stream with binary data, produced a generator of decoded - "text chunks", and also save the original binary data in a log file. + Take an input stream with binary data (likely from Popen), produced a generator of decoded + "text chunks". + + Side effect: it also save the original binary data in a log file. ''' os.makedirs(logDir, exist_ok=True) logF = open(os.path.join(logDir, logFile), 'wb') + if logF is None: + Logging.error("Failed to open log file (binary write): {}/{}".format(logDir, logFile)) + return for bChunk in iter(streamIn.readline, b''): logF.write(bChunk) # Write to log file immediately tChunk = self._decodeBinaryChunk(bChunk) # decode @@ -902,14 +900,14 @@ class ServiceManagerThread: yield tChunk # TODO: split into actual text lines # At the end... - streamIn.close() # Close the stream - logF.close() # Close the output file + streamIn.close() # Close the incoming stream + logF.close() # Close the log file - def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str): + def svcOutputReader(self, ipcStdOut: IpcStream, queue, logDir: str): ''' The infinite routine that processes the STDOUT stream for the sub process being managed. - :param stdOut: the IO stream object used to fetch the data from + :param ipcStdOut: the IO stream object used to fetch the data from :param queue: the queue where we dump the roughly parsed chunk-by-chunk text data :param logDir: where we should dump a verbatim output file ''' @@ -917,7 +915,7 @@ class ServiceManagerThread: # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # print("This is the svcOutput Reader...") # stdOut.readline() # Skip the first output? TODO: remove? - for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') : + for tChunk in self._textChunkGenerator(ipcStdOut, logDir, 'stdout.log') : queue.put(tChunk) # tChunk garanteed not to be None self._printProgress("_i") @@ -940,12 +938,12 @@ class ServiceManagerThread: Logging.info("EOF found TDengine STDOUT, marking the process as terminated") self.setStatus(Status.STATUS_STOPPED) - def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str): + def svcErrorReader(self, ipcStdErr: IpcStream, queue, logDir: str): # os.makedirs(logDir, exist_ok=True) # logFile = os.path.join(logDir,'stderr.log') # fErr = open(logFile, 'wb') # for line in iter(err.readline, b''): - for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') : + for tChunk in self._textChunkGenerator(ipcStdErr, logDir, 'stderr.log') : queue.put(tChunk) # tChunk garanteed not to be None # fErr.write(line) Logging.info("TDengine STDERR: {}".format(tChunk)) diff --git a/tests/pytest/crash_gen/shared/types.py b/tests/pytest/crash_gen/shared/types.py index 814a8219178d1c01ee0291447939984bdf81df9d..42fd2a1617cf729e4f23fc61a685027f738bc4a3 100644 --- a/tests/pytest/crash_gen/shared/types.py +++ b/tests/pytest/crash_gen/shared/types.py @@ -1,4 +1,4 @@ -from typing import Any, List, Dict, NewType +from typing import Any, BinaryIO, List, Dict, NewType from enum import Enum DirPath = NewType('DirPath', str) @@ -26,3 +26,5 @@ class TdDataType(Enum): TdColumns = Dict[str, TdDataType] TdTags = Dict[str, TdDataType] + +IpcStream = NewType('IpcStream', BinaryIO) \ No newline at end of file