From 8a6fd8df16b8f9dcf18210f6057b79ee4815c868 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 28 Apr 2021 02:42:27 +0000 Subject: [PATCH] Refactoring service_manager in crash_gen to use stronger types --- tests/pytest/crash_gen/__init__.py | 5 + tests/pytest/crash_gen/crash_gen_main.py | 33 ++-- tests/pytest/crash_gen/service_manager.py | 183 ++++++++++++---------- tests/pytest/crash_gen/settings.py | 13 +- 4 files changed, 132 insertions(+), 102 deletions(-) create mode 100644 tests/pytest/crash_gen/__init__.py diff --git a/tests/pytest/crash_gen/__init__.py b/tests/pytest/crash_gen/__init__.py new file mode 100644 index 0000000000..71855bc5a9 --- /dev/null +++ b/tests/pytest/crash_gen/__init__.py @@ -0,0 +1,5 @@ +# Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645 +from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess +from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress +from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager +from crash_gen.settings import Settings diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 44295e8bee..39475a32d0 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -32,21 +32,22 @@ import getopt import sys import os +import io import signal import traceback import resource # from guppy import hpy import gc -from crash_gen.service_manager import ServiceManager, TdeInstance -from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress -from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager -import crash_gen.settings +# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess +from crash_gen import ServiceManager, Settings, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \ + CrashGenError, Progress, MyTDSql, \ + TdeInstance import taos import requests -crash_gen.settings.init() +Settings.init() # Require Python 3 if sys.version_info[0] < 3: @@ -89,9 +90,9 @@ class WorkerThread: self._dbConn = DbConn.createRest(tInst.getDbTarget()) elif gConfig.connector_type == 'mixed': if Dice.throw(2) == 0: # 1/2 chance - self._dbConn = DbConn.createNative() + self._dbConn = DbConn.createNative(tInst.getDbTarget()) else: - self._dbConn = DbConn.createRest() + self._dbConn = DbConn.createRest(tInst.getDbTarget()) else: raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) @@ -1370,13 +1371,13 @@ class Task(): self._err = e self._aborted = True traceback.print_exc() - except BaseException as e: + except BaseException as e2: self.logInfo("Python base exception encountered") - self._err = e + # self._err = e2 # Exception/BaseException incompatible! self._aborted = True traceback.print_exc() - except BaseException: # TODO: what is this again??!! - raise RuntimeError("Punt") + # except BaseException: # TODO: what is this again??!! + # raise RuntimeError("Punt") # self.logDebug( # "[=] Unexpected exception, SQL: {}".format( # wt.getDbConn().getLastSql())) @@ -1980,8 +1981,8 @@ class TaskAddData(StateTransitionTask): activeTable: Set[int] = set() # We use these two files to record operations to DB, useful for power-off tests - fAddLogReady = None # type: TextIOWrapper - fAddLogDone = None # type: TextIOWrapper + fAddLogReady = None # type: io.TextIOWrapper + fAddLogDone = None # type: io.TextIOWrapper @classmethod def prepToRecordOps(cls): @@ -2025,7 +2026,7 @@ class TaskAddData(StateTransitionTask): self.prepToRecordOps() self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.flush() - os.fsync(self.fAddLogReady) + os.fsync(self.fAddLogReady.fileno()) # TODO: too ugly trying to lock the table reliably, refactor... fullTableName = db.getName() + '.' + regTableName @@ -2088,7 +2089,7 @@ class TaskAddData(StateTransitionTask): if gConfig.record_ops: self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.flush() - os.fsync(self.fAddLogDone) + os.fsync(self.fAddLogDone.fileno()) def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access @@ -2468,7 +2469,7 @@ class MainExec: global gConfig gConfig = parser.parse_args() - crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var + Settings.setConfig(gConfig) # TODO: fix this hack, consolidate this global var # Sanity check for arguments if gConfig.use_shadow_db and gConfig.max_dbs>1 : diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index cdbf2db4da..146215f2bc 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import io import sys @@ -5,9 +7,9 @@ import threading import signal import logging import time -import subprocess +from subprocess import PIPE, Popen, TimeoutExpired -from typing import IO, List +from typing import IO, List, NewType, Optional try: import psutil @@ -170,6 +172,7 @@ quorum 2 if crash_gen.settings.gConfig.track_memory_leaks: Logging.info("Invoking VALGRIND on service...") cmdLine = ['valgrind', '--leak-check=yes'] + # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() return cmdLine @@ -225,41 +228,46 @@ class TdeSubProcess: It takes a TdeInstance object as its parameter, with the rationale being "a sub process runs an instance". + + We aim to ensure that this object has exactly the same life-cycle as the + underlying sub process. """ # RET_ALREADY_STOPPED = -1 # RET_TIME_OUT = -3 # RET_SUCCESS = -4 - def __init__(self): - self.subProcess = None # type: subprocess.Popen + def __init__(self, po: Popen): + self._popen = po # type: Popen # if tInst is None: # raise CrashGenError("Empty instance not allowed in TdeSubProcess") # self._tInst = tInst # Default create at ServiceManagerThread def __repr__(self): - if self.subProcess is None: - return '[TdeSubProc: Empty]' + # if self.subProcess is None: + # return '[TdeSubProc: Empty]' return '[TdeSubProc: pid = {}]'.format(self.getPid()) def getStdOut(self): - return self.subProcess.stdout + return self._popen.stdout def getStdErr(self): - return self.subProcess.stderr + return self._popen.stderr - def isRunning(self): - return self.subProcess is not None + # Now it's always running, since we matched the life cycle + # def isRunning(self): + # return self.subProcess is not None def getPid(self): - return self.subProcess.pid + return self._popen.pid - def start(self, cmdLine): + @classmethod + def start(cls, cmdLine): ON_POSIX = 'posix' in sys.builtin_module_names # Sanity check - if self.subProcess: # already there - raise RuntimeError("Corrupt process state") + # if self.subProcess: # already there + # raise RuntimeError("Corrupt process state") # Prepare environment variables for coverage information # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment @@ -270,23 +278,22 @@ class TdeSubProcess: # print("Starting TDengine with env: ", myEnv.items()) # print("Starting TDengine via Shell: {}".format(cmdLineStr)) - useShell = True # Needed to pass environments into it - self.subProcess = subprocess.Popen( - # ' '.join(cmdLine) if useShell else cmdLine, - # shell=useShell, - ' '.join(cmdLine), - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - # bufsize=1, # not supported in binary mode + # useShell = True # Needed to pass environments into it + popen = Popen( + ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, + shell=True, # Always use shell, since we need to pass ENV vars + stdout=PIPE, + stderr=PIPE, close_fds=ON_POSIX, env=myEnv ) # had text=True, which interferred with reading EOF + return cls(popen) STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process? SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm - def stop(self): + @classmethod + def stop(cls, tsp: TdeSubProcess): """ Stop a sub process, DO NOT return anything, process all conditions INSIDE @@ -306,29 +313,30 @@ class TdeSubProcess: SIGSEGV 11 SIGUSR2 12 """ - if not self.subProcess: - Logging.error("Sub process already stopped") - return + # self._popen should always be valid. + + # if not self.subProcess: + # Logging.error("Sub process already stopped") + # return - retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N) + retCode = tsp._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N) if retCode: # valid return code, process ended # retCode = -retCode # only if valid Logging.warning("TSP.stop(): process ended itself") - self.subProcess = None + # self.subProcess = None return # process still alive, let's interrupt it - self._stopForSure(self.subProcess, self.STOP_SIGNAL) # success if no exception - self.subProcess = None + cls._stopForSure(tsp._popen, cls.STOP_SIGNAL) # success if no exception # sub process should end, then IPC queue should end, causing IO thread to end @classmethod - def _stopForSure(cls, proc: subprocess.Popen, sig: int): + def _stopForSure(cls, proc: Popen, sig: int): ''' Stop a process and all sub processes with a singal, and SIGKILL if necessary ''' - def doKillTdService(proc: subprocess.Popen, sig: int): + def doKillTdService(proc: Popen, sig: int): Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig)) proc.send_signal(sig) try: @@ -340,7 +348,7 @@ class TdeSubProcess: else: Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) return True # terminated successfully - except subprocess.TimeoutExpired as err: + except TimeoutExpired as err: Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig)) return False # failed to terminate @@ -361,10 +369,10 @@ class TdeSubProcess: Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig)) return False # did not terminate - def doKill(proc: subprocess.Popen, sig: int): + def doKill(proc: Popen, sig: int): pid = proc.pid try: - topSubProc = psutil.Process(pid) + topSubProc = psutil.Process(pid) # Now that we are doing "exec -c", should not have children any more for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False Logging.warning("Unexpected child to be killed") doKillChild(child, sig) @@ -389,17 +397,15 @@ class TdeSubProcess: return doKill(proc, sig) def hardKill(proc): - return doKill(proc, signal.SIGKILL) - - + return doKill(proc, signal.SIGKILL) pid = proc.pid Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig)) if softKill(proc, sig): - return# success + return # success if sig != signal.SIGKILL: # really was soft above if hardKill(proc): - return + return raise CrashGenError("Failed to stop process, pid={}".format(pid)) class ServiceManager: @@ -657,10 +663,9 @@ class ServiceManagerThread: Logging.info("Attempting to start TAOS service: {}".format(self)) self._status.set(Status.STATUS_STARTING) - self._tdeSubProcess = TdeSubProcess() - self._tdeSubProcess.start(cmdLine) # TODO: verify process is running + self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running - self._ipcQueue = Queue() + self._ipcQueue = Queue() # type: Queue self._thread = threading.Thread( # First thread captures server OUTPUT target=self.svcOutputReader, args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir)) @@ -738,21 +743,15 @@ class ServiceManagerThread: raise RuntimeError("sub process object missing") self._status.set(Status.STATUS_STOPPING) - # retCode = self._tdeSubProcess.stop() - # try: - # retCode = self._tdeSubProcess.stop() - # # print("Attempted to stop sub process, got return code: {}".format(retCode)) - # if retCode == signal.SIGSEGV : # SGV - # Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") - # except subprocess.TimeoutExpired as err: - # Logging.info("Time out waiting for TDengine service process to exit") - if not self._tdeSubProcess.stop(): # everything withing - if self._tdeSubProcess.isRunning(): # still running, should now never happen - Logging.error("FAILED to stop sub process, it is still running... pid = {}".format( - self._tdeSubProcess.getPid())) - else: - self._tdeSubProcess = None # not running any more - self.join() # stop the thread, change the status, etc. + TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what + self._tdeSubProcess = None + # if not self._tdeSubProcess.stop(): # everything withing + # if self._tdeSubProcess.isRunning(): # still running, should now never happen + # Logging.error("FAILED to stop sub process, it is still running... pid = {}".format( + # self._tdeSubProcess.getPid())) + # else: + # self._tdeSubProcess = None # not running any more + self.join() # stop the thread, change the status, etc. # Check if it's really stopped outputLines = 10 # for last output @@ -827,6 +826,19 @@ class ServiceManagerThread: print(pBar, end="", flush=True) print('\b\b\b\b', end="", flush=True) + BinaryLine = NewType('BinaryLine', bytes) # line with binary data, directly from STDOUT, etc. + TextLine = NewType('TextLine', str) # properly decoded, suitable for printing, etc. + x = TextLine('xyz') + + @classmethod + def _decodeBinLine(cls, bLine: BinaryLine) -> Optional[TextLine] : + try: + tLine = bLine.decode("utf-8").rstrip() + return cls.TextLine(tLine) + except UnicodeError: + print("\nNon-UTF8 server output: {}\n".format(bLine.decode('cp437'))) + return None + def svcOutputReader(self, out: IO, queue, logDir: str): ''' The infinite routine that processes the STDOUT stream for the sub process being managed. @@ -841,32 +853,37 @@ class ServiceManagerThread: # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # print("This is the svcOutput Reader...") # for line in out : - for line in iter(out.readline, b''): - fOut.write(line) + out.readline() + for bLine in iter(out.readline, b''): + fOut.write(bLine) # print("Finished reading a line: {}".format(line)) # print("Adding item to queue...") - try: - line = line.decode("utf-8").rstrip() - except UnicodeError: - print("\nNon-UTF8 server output: {}\n".format(line)) - - # This might block, and then causing "out" buffer to block - queue.put(line) - self._printProgress("_i") - - if self._status.isStarting(): # we are starting, let's see if we have started - if line.find(self.TD_READY_MSG) != -1: # found - Logging.info("Waiting for the service to become FULLY READY") - time.sleep(1.0) # wait for the server to truly start. TODO: remove this - Logging.info("Service is now FULLY READY") # TODO: more ID info here? - self._status.set(Status.STATUS_RUNNING) - - # Trim the queue if necessary: TODO: try this 1 out of 10 times - self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size - - if self._status.isStopping(): # TODO: use thread status instead - # WAITING for stopping sub process to finish its outptu - print("_w", end="", flush=True) + + # Moved to above + # try: + # line = line.decode("utf-8").rstrip() + # except UnicodeError: + # print("\nNon-UTF8 server output: {}\n".format(line)) + tLine = self._decodeBinLine(bLine) + + if tLine is not None: + # This might block, and then causing "out" buffer to block + queue.put(tLine) + self._printProgress("_i") + + if self._status.isStarting(): # we are starting, let's see if we have started + if tLine.find(self.TD_READY_MSG) != -1: # found + Logging.info("Waiting for the service to become FULLY READY") + time.sleep(1.0) # wait for the server to truly start. TODO: remove this + Logging.info("Service is now FULLY READY") # TODO: more ID info here? + self._status.set(Status.STATUS_RUNNING) + + # Trim the queue if necessary: TODO: try this 1 out of 10 times + self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size + + if self._status.isStopping(): # TODO: use thread status instead + # WAITING for stopping sub process to finish its outptu + print("_w", end="", flush=True) # queue.put(line) # meaning sub process must have died diff --git a/tests/pytest/crash_gen/settings.py b/tests/pytest/crash_gen/settings.py index 3c4c91e6e0..ae0132378d 100644 --- a/tests/pytest/crash_gen/settings.py +++ b/tests/pytest/crash_gen/settings.py @@ -3,6 +3,13 @@ import argparse gConfig: argparse.Namespace -def init(): - global gConfig - gConfig = [] \ No newline at end of file +class Settings: + @classmethod + def init(cls): + global gConfig + gConfig = [] + + @classmethod + def setConfig(cls, config): + global gConfig + gConfig = config \ No newline at end of file -- GitLab