From 34eb7e255a021a58d7dae26bb75e9bf799fabac7 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 28 Apr 2021 23:11:19 +0000 Subject: [PATCH] Used Python generator to simplify crash_gen IPC processing of STDOUT/STDERR --- tests/pytest/crash_gen/service_manager.py | 133 +++++++++++----------- 1 file changed, 69 insertions(+), 64 deletions(-) diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 4afd7e4c78..f6784f17e3 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -9,7 +9,8 @@ import signal import logging import time from subprocess import PIPE, Popen, TimeoutExpired -from typing import IO, List, NewType, Optional +from typing import BinaryIO, Generator, IO, List, NewType, Optional +import typing try: import psutil @@ -275,11 +276,16 @@ class TdeSubProcess: return '[TdeSubProc: pid = {}, status = {}]'.format( self.getPid(), self.getStatus() ) - def getStdOut(self): - return self._popen.stdout + def getStdOut(self) -> BinaryIO : + if self._popen.universal_newlines : # alias of text_mode + raise CrashGenError("We need binary mode for STDOUT IPC") + # Logging.info("Type of stdout is: {}".format(type(self._popen.stdout))) + return typing.cast(BinaryIO, self._popen.stdout) - def getStdErr(self): - return self._popen.stderr + def getStdErr(self) -> BinaryIO : + if self._popen.universal_newlines : # alias of text_mode + raise CrashGenError("We need binary mode for STDERR IPC") + return typing.cast(BinaryIO, self._popen.stderr) # Now it's always running, since we matched the life cycle # def isRunning(self): @@ -846,7 +852,7 @@ class ServiceManagerThread: def procIpcBatch(self, trimToTarget=0, forceOutput=False): ''' Process a batch of STDOUT/STDERR data, until we read EMPTY from - the pipe. + the queue. ''' self._trimQueue(trimToTarget) # trim if necessary # Process all the output generated by the underlying sub process, @@ -876,79 +882,78 @@ class ServiceManagerThread: print(pBar, end="", flush=True) print('\b\b\b\b', end="", flush=True) - BinaryLine = NewType('BinaryLine', bytes) # line with binary data, directly from STDOUT, etc. - TextLine = NewType('TextLine', str) # properly decoded, suitable for printing, etc. - x = TextLine('xyz') - + BinaryChunk = NewType('BinaryChunk', bytes) # line with binary data, directly from STDOUT, etc. + TextChunk = NewType('TextChunk', str) # properly decoded, suitable for printing, etc. + @classmethod - def _decodeBinLine(cls, bLine: BinaryLine) -> Optional[TextLine] : + def _decodeBinaryChunk(cls, bChunk: bytes) -> Optional[TextChunk] : try: - tLine = bLine.decode("utf-8").rstrip() - return cls.TextLine(tLine) + tChunk = bChunk.decode("utf-8").rstrip() + return cls.TextChunk(tChunk) except UnicodeError: - print("\nNon-UTF8 server output: {}\n".format(bLine.decode('cp437'))) + print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437'))) return None - def svcOutputReader(self, out: IO, queue, logDir: str): + def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str + ) -> Generator[TextChunk, None, None]: + ''' + Take an input stream with binary data, produced a generator of decoded + "text chunks", and also save the original binary data in a log file. + ''' + os.makedirs(logDir, exist_ok=True) + logF = open(os.path.join(logDir, logFile), 'wb') + for bChunk in iter(streamIn.readline, b''): + logF.write(bChunk) # Write to log file immediately + tChunk = self._decodeBinaryChunk(bChunk) # decode + if tChunk is not None: + yield tChunk # TODO: split into actual text lines + + # At the end... + streamIn.close() # Close the stream + logF.close() # Close the output file + + def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str): ''' The infinite routine that processes the STDOUT stream for the sub process being managed. - :param out: the IO stream object used to fetch the data from - :param queue: the queue where we dump the roughly parsed line-by-line data + :param stdOut: the IO stream object used to fetch the data from + :param queue: the queue where we dump the roughly parsed chunk-by-chunk text data :param logDir: where we should dump a verbatim output file ''' - os.makedirs(logDir, exist_ok=True) - logFile = os.path.join(logDir,'stdout.log') - fOut = open(logFile, 'wb') + # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # print("This is the svcOutput Reader...") - # for line in out : - out.readline() - for bLine in iter(out.readline, b''): - fOut.write(bLine) - # print("Finished reading a line: {}".format(line)) - # print("Adding item to queue...") - - # Moved to above - # try: - # line = line.decode("utf-8").rstrip() - # except UnicodeError: - # print("\nNon-UTF8 server output: {}\n".format(line)) - tLine = self._decodeBinLine(bLine) - - if tLine is not None: - # This might block, and then causing "out" buffer to block - queue.put(tLine) - self._printProgress("_i") - - if self._status.isStarting(): # we are starting, let's see if we have started - if tLine.find(self.TD_READY_MSG) != -1: # found - Logging.info("Waiting for the service to become FULLY READY") - time.sleep(1.0) # wait for the server to truly start. TODO: remove this - Logging.info("Service is now FULLY READY") # TODO: more ID info here? - self._status.set(Status.STATUS_RUNNING) - - # Trim the queue if necessary: TODO: try this 1 out of 10 times - self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size - - if self._status.isStopping(): # TODO: use thread status instead - # WAITING for stopping sub process to finish its outptu - print("_w", end="", flush=True) + # stdOut.readline() # Skip the first output? TODO: remove? + for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') : + queue.put(tChunk) # tChunk garanteed not to be None + self._printProgress("_i") + + if self._status.isStarting(): # we are starting, let's see if we have started + if tChunk.find(self.TD_READY_MSG) != -1: # found + Logging.info("Waiting for the service to become FULLY READY") + time.sleep(1.0) # wait for the server to truly start. TODO: remove this + Logging.info("Service is now FULLY READY") # TODO: more ID info here? + self._status.set(Status.STATUS_RUNNING) + + # Trim the queue if necessary: TODO: try this 1 out of 10 times + self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size + + if self._status.isStopping(): # TODO: use thread status instead + # WAITING for stopping sub process to finish its outptu + print("_w", end="", flush=True) # queue.put(line) - # meaning sub process must have died + # stdOut has no more data, meaning sub process must have died Logging.info("EOF found TDengine STDOUT, marking the process as terminated") self.setStatus(Status.STATUS_STOPPED) - out.close() # Close the stream - fOut.close() # Close the output file - def svcErrorReader(self, err: IO, queue, logDir: str): - os.makedirs(logDir, exist_ok=True) - logFile = os.path.join(logDir,'stderr.log') - fErr = open(logFile, 'wb') - for line in iter(err.readline, b''): - fErr.write(line) - Logging.info("TDengine STDERR: {}".format(line)) + def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str): + # os.makedirs(logDir, exist_ok=True) + # logFile = os.path.join(logDir,'stderr.log') + # fErr = open(logFile, 'wb') + # for line in iter(err.readline, b''): + for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') : + queue.put(tChunk) # tChunk garanteed not to be None + # fErr.write(line) + Logging.info("TDengine STDERR: {}".format(tChunk)) Logging.info("EOF for TDengine STDERR") - err.close() - fErr.close() \ No newline at end of file -- GitLab