提交 34eb7e25 编写于 作者: S Steven Li

Used Python generator to simplify crash_gen IPC processing of STDOUT/STDERR

上级 fbadcfb5
...@@ -9,7 +9,8 @@ import signal ...@@ -9,7 +9,8 @@ import signal
import logging import logging
import time import time
from subprocess import PIPE, Popen, TimeoutExpired from subprocess import PIPE, Popen, TimeoutExpired
from typing import IO, List, NewType, Optional from typing import BinaryIO, Generator, IO, List, NewType, Optional
import typing
try: try:
import psutil import psutil
...@@ -275,11 +276,16 @@ class TdeSubProcess: ...@@ -275,11 +276,16 @@ class TdeSubProcess:
return '[TdeSubProc: pid = {}, status = {}]'.format( return '[TdeSubProc: pid = {}, status = {}]'.format(
self.getPid(), self.getStatus() ) self.getPid(), self.getStatus() )
def getStdOut(self): def getStdOut(self) -> BinaryIO :
return self._popen.stdout if self._popen.universal_newlines : # alias of text_mode
raise CrashGenError("We need binary mode for STDOUT IPC")
# Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
return typing.cast(BinaryIO, self._popen.stdout)
def getStdErr(self): def getStdErr(self) -> BinaryIO :
return self._popen.stderr if self._popen.universal_newlines : # alias of text_mode
raise CrashGenError("We need binary mode for STDERR IPC")
return typing.cast(BinaryIO, self._popen.stderr)
# Now it's always running, since we matched the life cycle # Now it's always running, since we matched the life cycle
# def isRunning(self): # def isRunning(self):
...@@ -846,7 +852,7 @@ class ServiceManagerThread: ...@@ -846,7 +852,7 @@ class ServiceManagerThread:
def procIpcBatch(self, trimToTarget=0, forceOutput=False): def procIpcBatch(self, trimToTarget=0, forceOutput=False):
''' '''
Process a batch of STDOUT/STDERR data, until we read EMPTY from Process a batch of STDOUT/STDERR data, until we read EMPTY from
the pipe. the queue.
''' '''
self._trimQueue(trimToTarget) # trim if necessary self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process, # Process all the output generated by the underlying sub process,
...@@ -876,53 +882,54 @@ class ServiceManagerThread: ...@@ -876,53 +882,54 @@ class ServiceManagerThread:
print(pBar, end="", flush=True) print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True) print('\b\b\b\b', end="", flush=True)
BinaryLine = NewType('BinaryLine', bytes) # line with binary data, directly from STDOUT, etc. BinaryChunk = NewType('BinaryChunk', bytes) # line with binary data, directly from STDOUT, etc.
TextLine = NewType('TextLine', str) # properly decoded, suitable for printing, etc. TextChunk = NewType('TextChunk', str) # properly decoded, suitable for printing, etc.
x = TextLine('xyz')
@classmethod @classmethod
def _decodeBinLine(cls, bLine: BinaryLine) -> Optional[TextLine] : def _decodeBinaryChunk(cls, bChunk: bytes) -> Optional[TextChunk] :
try: try:
tLine = bLine.decode("utf-8").rstrip() tChunk = bChunk.decode("utf-8").rstrip()
return cls.TextLine(tLine) return cls.TextChunk(tChunk)
except UnicodeError: except UnicodeError:
print("\nNon-UTF8 server output: {}\n".format(bLine.decode('cp437'))) print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
return None return None
def svcOutputReader(self, out: IO, queue, logDir: str): def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str
) -> Generator[TextChunk, None, None]:
'''
Take an input stream with binary data, produced a generator of decoded
"text chunks", and also save the original binary data in a log file.
'''
os.makedirs(logDir, exist_ok=True)
logF = open(os.path.join(logDir, logFile), 'wb')
for bChunk in iter(streamIn.readline, b''):
logF.write(bChunk) # Write to log file immediately
tChunk = self._decodeBinaryChunk(bChunk) # decode
if tChunk is not None:
yield tChunk # TODO: split into actual text lines
# At the end...
streamIn.close() # Close the stream
logF.close() # Close the output file
def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str):
''' '''
The infinite routine that processes the STDOUT stream for the sub process being managed. The infinite routine that processes the STDOUT stream for the sub process being managed.
:param out: the IO stream object used to fetch the data from :param stdOut: the IO stream object used to fetch the data from
:param queue: the queue where we dump the roughly parsed line-by-line data :param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
:param logDir: where we should dump a verbatim output file :param logDir: where we should dump a verbatim output file
''' '''
os.makedirs(logDir, exist_ok=True)
logFile = os.path.join(logDir,'stdout.log')
fOut = open(logFile, 'wb')
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...") # print("This is the svcOutput Reader...")
# for line in out : # stdOut.readline() # Skip the first output? TODO: remove?
out.readline() for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') :
for bLine in iter(out.readline, b''): queue.put(tChunk) # tChunk garanteed not to be None
fOut.write(bLine)
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
# Moved to above
# try:
# line = line.decode("utf-8").rstrip()
# except UnicodeError:
# print("\nNon-UTF8 server output: {}\n".format(line))
tLine = self._decodeBinLine(bLine)
if tLine is not None:
# This might block, and then causing "out" buffer to block
queue.put(tLine)
self._printProgress("_i") self._printProgress("_i")
if self._status.isStarting(): # we are starting, let's see if we have started if self._status.isStarting(): # we are starting, let's see if we have started
if tLine.find(self.TD_READY_MSG) != -1: # found if tChunk.find(self.TD_READY_MSG) != -1: # found
Logging.info("Waiting for the service to become FULLY READY") Logging.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this time.sleep(1.0) # wait for the server to truly start. TODO: remove this
Logging.info("Service is now FULLY READY") # TODO: more ID info here? Logging.info("Service is now FULLY READY") # TODO: more ID info here?
...@@ -936,19 +943,17 @@ class ServiceManagerThread: ...@@ -936,19 +943,17 @@ class ServiceManagerThread:
print("_w", end="", flush=True) print("_w", end="", flush=True)
# queue.put(line) # queue.put(line)
# meaning sub process must have died # stdOut has no more data, meaning sub process must have died
Logging.info("EOF found TDengine STDOUT, marking the process as terminated") Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
self.setStatus(Status.STATUS_STOPPED) self.setStatus(Status.STATUS_STOPPED)
out.close() # Close the stream
fOut.close() # Close the output file
def svcErrorReader(self, err: IO, queue, logDir: str): def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str):
os.makedirs(logDir, exist_ok=True) # os.makedirs(logDir, exist_ok=True)
logFile = os.path.join(logDir,'stderr.log') # logFile = os.path.join(logDir,'stderr.log')
fErr = open(logFile, 'wb') # fErr = open(logFile, 'wb')
for line in iter(err.readline, b''): # for line in iter(err.readline, b''):
fErr.write(line) for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') :
Logging.info("TDengine STDERR: {}".format(line)) queue.put(tChunk) # tChunk garanteed not to be None
# fErr.write(line)
Logging.info("TDengine STDERR: {}".format(tChunk))
Logging.info("EOF for TDengine STDERR") Logging.info("EOF for TDengine STDERR")
err.close()
fErr.close()
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册