提交 8a6fd8df 编写于 作者: S Steven Li

Refactoring service_manager in crash_gen to use stronger types

上级 4df6967d
# Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645
from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
from crash_gen.settings import Settings
...@@ -32,21 +32,22 @@ import getopt ...@@ -32,21 +32,22 @@ import getopt
import sys import sys
import os import os
import io
import signal import signal
import traceback import traceback
import resource import resource
# from guppy import hpy # from guppy import hpy
import gc import gc
from crash_gen.service_manager import ServiceManager, TdeInstance # from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen import ServiceManager, Settings, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager CrashGenError, Progress, MyTDSql, \
import crash_gen.settings TdeInstance
import taos import taos
import requests import requests
crash_gen.settings.init() Settings.init()
# Require Python 3 # Require Python 3
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
...@@ -89,9 +90,9 @@ class WorkerThread: ...@@ -89,9 +90,9 @@ class WorkerThread:
self._dbConn = DbConn.createRest(tInst.getDbTarget()) self._dbConn = DbConn.createRest(tInst.getDbTarget())
elif gConfig.connector_type == 'mixed': elif gConfig.connector_type == 'mixed':
if Dice.throw(2) == 0: # 1/2 chance if Dice.throw(2) == 0: # 1/2 chance
self._dbConn = DbConn.createNative() self._dbConn = DbConn.createNative(tInst.getDbTarget())
else: else:
self._dbConn = DbConn.createRest() self._dbConn = DbConn.createRest(tInst.getDbTarget())
else: else:
raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
...@@ -1370,13 +1371,13 @@ class Task(): ...@@ -1370,13 +1371,13 @@ class Task():
self._err = e self._err = e
self._aborted = True self._aborted = True
traceback.print_exc() traceback.print_exc()
except BaseException as e: except BaseException as e2:
self.logInfo("Python base exception encountered") self.logInfo("Python base exception encountered")
self._err = e # self._err = e2 # Exception/BaseException incompatible!
self._aborted = True self._aborted = True
traceback.print_exc() traceback.print_exc()
except BaseException: # TODO: what is this again??!! # except BaseException: # TODO: what is this again??!!
raise RuntimeError("Punt") # raise RuntimeError("Punt")
# self.logDebug( # self.logDebug(
# "[=] Unexpected exception, SQL: {}".format( # "[=] Unexpected exception, SQL: {}".format(
# wt.getDbConn().getLastSql())) # wt.getDbConn().getLastSql()))
...@@ -1980,8 +1981,8 @@ class TaskAddData(StateTransitionTask): ...@@ -1980,8 +1981,8 @@ class TaskAddData(StateTransitionTask):
activeTable: Set[int] = set() activeTable: Set[int] = set()
# We use these two files to record operations to DB, useful for power-off tests # We use these two files to record operations to DB, useful for power-off tests
fAddLogReady = None # type: TextIOWrapper fAddLogReady = None # type: io.TextIOWrapper
fAddLogDone = None # type: TextIOWrapper fAddLogDone = None # type: io.TextIOWrapper
@classmethod @classmethod
def prepToRecordOps(cls): def prepToRecordOps(cls):
...@@ -2025,7 +2026,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2025,7 +2026,7 @@ class TaskAddData(StateTransitionTask):
self.prepToRecordOps() self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.flush() self.fAddLogReady.flush()
os.fsync(self.fAddLogReady) os.fsync(self.fAddLogReady.fileno())
# TODO: too ugly trying to lock the table reliably, refactor... # TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
...@@ -2088,7 +2089,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2088,7 +2089,7 @@ class TaskAddData(StateTransitionTask):
if gConfig.record_ops: if gConfig.record_ops:
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.flush() self.fAddLogDone.flush()
os.fsync(self.fAddLogDone) os.fsync(self.fAddLogDone.fileno())
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
...@@ -2468,7 +2469,7 @@ class MainExec: ...@@ -2468,7 +2469,7 @@ class MainExec:
global gConfig global gConfig
gConfig = parser.parse_args() gConfig = parser.parse_args()
crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var Settings.setConfig(gConfig) # TODO: fix this hack, consolidate this global var
# Sanity check for arguments # Sanity check for arguments
if gConfig.use_shadow_db and gConfig.max_dbs>1 : if gConfig.use_shadow_db and gConfig.max_dbs>1 :
......
from __future__ import annotations
import os import os
import io import io
import sys import sys
...@@ -5,9 +7,9 @@ import threading ...@@ -5,9 +7,9 @@ import threading
import signal import signal
import logging import logging
import time import time
import subprocess from subprocess import PIPE, Popen, TimeoutExpired
from typing import IO, List from typing import IO, List, NewType, Optional
try: try:
import psutil import psutil
...@@ -170,6 +172,7 @@ quorum 2 ...@@ -170,6 +172,7 @@ quorum 2
if crash_gen.settings.gConfig.track_memory_leaks: if crash_gen.settings.gConfig.track_memory_leaks:
Logging.info("Invoking VALGRIND on service...") Logging.info("Invoking VALGRIND on service...")
cmdLine = ['valgrind', '--leak-check=yes'] cmdLine = ['valgrind', '--leak-check=yes']
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
return cmdLine return cmdLine
...@@ -225,41 +228,46 @@ class TdeSubProcess: ...@@ -225,41 +228,46 @@ class TdeSubProcess:
It takes a TdeInstance object as its parameter, with the rationale being It takes a TdeInstance object as its parameter, with the rationale being
"a sub process runs an instance". "a sub process runs an instance".
We aim to ensure that this object has exactly the same life-cycle as the
underlying sub process.
""" """
# RET_ALREADY_STOPPED = -1 # RET_ALREADY_STOPPED = -1
# RET_TIME_OUT = -3 # RET_TIME_OUT = -3
# RET_SUCCESS = -4 # RET_SUCCESS = -4
def __init__(self): def __init__(self, po: Popen):
self.subProcess = None # type: subprocess.Popen self._popen = po # type: Popen
# if tInst is None: # if tInst is None:
# raise CrashGenError("Empty instance not allowed in TdeSubProcess") # raise CrashGenError("Empty instance not allowed in TdeSubProcess")
# self._tInst = tInst # Default create at ServiceManagerThread # self._tInst = tInst # Default create at ServiceManagerThread
def __repr__(self): def __repr__(self):
if self.subProcess is None: # if self.subProcess is None:
return '[TdeSubProc: Empty]' # return '[TdeSubProc: Empty]'
return '[TdeSubProc: pid = {}]'.format(self.getPid()) return '[TdeSubProc: pid = {}]'.format(self.getPid())
def getStdOut(self): def getStdOut(self):
return self.subProcess.stdout return self._popen.stdout
def getStdErr(self): def getStdErr(self):
return self.subProcess.stderr return self._popen.stderr
def isRunning(self): # Now it's always running, since we matched the life cycle
return self.subProcess is not None # def isRunning(self):
# return self.subProcess is not None
def getPid(self): def getPid(self):
return self.subProcess.pid return self._popen.pid
def start(self, cmdLine): @classmethod
def start(cls, cmdLine):
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check # Sanity check
if self.subProcess: # already there # if self.subProcess: # already there
raise RuntimeError("Corrupt process state") # raise RuntimeError("Corrupt process state")
# Prepare environment variables for coverage information # Prepare environment variables for coverage information
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
...@@ -270,23 +278,22 @@ class TdeSubProcess: ...@@ -270,23 +278,22 @@ class TdeSubProcess:
# print("Starting TDengine with env: ", myEnv.items()) # print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr)) # print("Starting TDengine via Shell: {}".format(cmdLineStr))
useShell = True # Needed to pass environments into it # useShell = True # Needed to pass environments into it
self.subProcess = subprocess.Popen( popen = Popen(
# ' '.join(cmdLine) if useShell else cmdLine, ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
# shell=useShell, shell=True, # Always use shell, since we need to pass ENV vars
' '.join(cmdLine), stdout=PIPE,
shell=True, stderr=PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# bufsize=1, # not supported in binary mode
close_fds=ON_POSIX, close_fds=ON_POSIX,
env=myEnv env=myEnv
) # had text=True, which interferred with reading EOF ) # had text=True, which interferred with reading EOF
return cls(popen)
STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process? STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
def stop(self): @classmethod
def stop(cls, tsp: TdeSubProcess):
""" """
Stop a sub process, DO NOT return anything, process all conditions INSIDE Stop a sub process, DO NOT return anything, process all conditions INSIDE
...@@ -306,29 +313,30 @@ class TdeSubProcess: ...@@ -306,29 +313,30 @@ class TdeSubProcess:
SIGSEGV 11 SIGSEGV 11
SIGUSR2 12 SIGUSR2 12
""" """
if not self.subProcess: # self._popen should always be valid.
Logging.error("Sub process already stopped")
return # if not self.subProcess:
# Logging.error("Sub process already stopped")
# return
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N) retCode = tsp._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
if retCode: # valid return code, process ended if retCode: # valid return code, process ended
# retCode = -retCode # only if valid # retCode = -retCode # only if valid
Logging.warning("TSP.stop(): process ended itself") Logging.warning("TSP.stop(): process ended itself")
self.subProcess = None # self.subProcess = None
return return
# process still alive, let's interrupt it # process still alive, let's interrupt it
self._stopForSure(self.subProcess, self.STOP_SIGNAL) # success if no exception cls._stopForSure(tsp._popen, cls.STOP_SIGNAL) # success if no exception
self.subProcess = None
# sub process should end, then IPC queue should end, causing IO thread to end # sub process should end, then IPC queue should end, causing IO thread to end
@classmethod @classmethod
def _stopForSure(cls, proc: subprocess.Popen, sig: int): def _stopForSure(cls, proc: Popen, sig: int):
''' '''
Stop a process and all sub processes with a singal, and SIGKILL if necessary Stop a process and all sub processes with a singal, and SIGKILL if necessary
''' '''
def doKillTdService(proc: subprocess.Popen, sig: int): def doKillTdService(proc: Popen, sig: int):
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig)) Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
proc.send_signal(sig) proc.send_signal(sig)
try: try:
...@@ -340,7 +348,7 @@ class TdeSubProcess: ...@@ -340,7 +348,7 @@ class TdeSubProcess:
else: else:
Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
return True # terminated successfully return True # terminated successfully
except subprocess.TimeoutExpired as err: except TimeoutExpired as err:
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig)) Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig))
return False # failed to terminate return False # failed to terminate
...@@ -361,10 +369,10 @@ class TdeSubProcess: ...@@ -361,10 +369,10 @@ class TdeSubProcess:
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig)) Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig))
return False # did not terminate return False # did not terminate
def doKill(proc: subprocess.Popen, sig: int): def doKill(proc: Popen, sig: int):
pid = proc.pid pid = proc.pid
try: try:
topSubProc = psutil.Process(pid) topSubProc = psutil.Process(pid) # Now that we are doing "exec -c", should not have children any more
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
Logging.warning("Unexpected child to be killed") Logging.warning("Unexpected child to be killed")
doKillChild(child, sig) doKillChild(child, sig)
...@@ -389,17 +397,15 @@ class TdeSubProcess: ...@@ -389,17 +397,15 @@ class TdeSubProcess:
return doKill(proc, sig) return doKill(proc, sig)
def hardKill(proc): def hardKill(proc):
return doKill(proc, signal.SIGKILL) return doKill(proc, signal.SIGKILL)
pid = proc.pid pid = proc.pid
Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig)) Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig))
if softKill(proc, sig): if softKill(proc, sig):
return# success return # success
if sig != signal.SIGKILL: # really was soft above if sig != signal.SIGKILL: # really was soft above
if hardKill(proc): if hardKill(proc):
return return
raise CrashGenError("Failed to stop process, pid={}".format(pid)) raise CrashGenError("Failed to stop process, pid={}".format(pid))
class ServiceManager: class ServiceManager:
...@@ -657,10 +663,9 @@ class ServiceManagerThread: ...@@ -657,10 +663,9 @@ class ServiceManagerThread:
Logging.info("Attempting to start TAOS service: {}".format(self)) Logging.info("Attempting to start TAOS service: {}".format(self))
self._status.set(Status.STATUS_STARTING) self._status.set(Status.STATUS_STARTING)
self._tdeSubProcess = TdeSubProcess() self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running
self._tdeSubProcess.start(cmdLine) # TODO: verify process is running
self._ipcQueue = Queue() self._ipcQueue = Queue() # type: Queue
self._thread = threading.Thread( # First thread captures server OUTPUT self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader, target=self.svcOutputReader,
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir)) args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir))
...@@ -738,21 +743,15 @@ class ServiceManagerThread: ...@@ -738,21 +743,15 @@ class ServiceManagerThread:
raise RuntimeError("sub process object missing") raise RuntimeError("sub process object missing")
self._status.set(Status.STATUS_STOPPING) self._status.set(Status.STATUS_STOPPING)
# retCode = self._tdeSubProcess.stop() TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what
# try: self._tdeSubProcess = None
# retCode = self._tdeSubProcess.stop() # if not self._tdeSubProcess.stop(): # everything withing
# # print("Attempted to stop sub process, got return code: {}".format(retCode)) # if self._tdeSubProcess.isRunning(): # still running, should now never happen
# if retCode == signal.SIGSEGV : # SGV # Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
# Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") # self._tdeSubProcess.getPid()))
# except subprocess.TimeoutExpired as err: # else:
# Logging.info("Time out waiting for TDengine service process to exit") # self._tdeSubProcess = None # not running any more
if not self._tdeSubProcess.stop(): # everything withing self.join() # stop the thread, change the status, etc.
if self._tdeSubProcess.isRunning(): # still running, should now never happen
Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
self._tdeSubProcess.getPid()))
else:
self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc.
# Check if it's really stopped # Check if it's really stopped
outputLines = 10 # for last output outputLines = 10 # for last output
...@@ -827,6 +826,19 @@ class ServiceManagerThread: ...@@ -827,6 +826,19 @@ class ServiceManagerThread:
print(pBar, end="", flush=True) print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True) print('\b\b\b\b', end="", flush=True)
BinaryLine = NewType('BinaryLine', bytes) # line with binary data, directly from STDOUT, etc.
TextLine = NewType('TextLine', str) # properly decoded, suitable for printing, etc.
x = TextLine('xyz')
@classmethod
def _decodeBinLine(cls, bLine: BinaryLine) -> Optional[TextLine] :
try:
tLine = bLine.decode("utf-8").rstrip()
return cls.TextLine(tLine)
except UnicodeError:
print("\nNon-UTF8 server output: {}\n".format(bLine.decode('cp437')))
return None
def svcOutputReader(self, out: IO, queue, logDir: str): def svcOutputReader(self, out: IO, queue, logDir: str):
''' '''
The infinite routine that processes the STDOUT stream for the sub process being managed. The infinite routine that processes the STDOUT stream for the sub process being managed.
...@@ -841,32 +853,37 @@ class ServiceManagerThread: ...@@ -841,32 +853,37 @@ class ServiceManagerThread:
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...") # print("This is the svcOutput Reader...")
# for line in out : # for line in out :
for line in iter(out.readline, b''): out.readline()
fOut.write(line) for bLine in iter(out.readline, b''):
fOut.write(bLine)
# print("Finished reading a line: {}".format(line)) # print("Finished reading a line: {}".format(line))
# print("Adding item to queue...") # print("Adding item to queue...")
try:
line = line.decode("utf-8").rstrip() # Moved to above
except UnicodeError: # try:
print("\nNon-UTF8 server output: {}\n".format(line)) # line = line.decode("utf-8").rstrip()
# except UnicodeError:
# This might block, and then causing "out" buffer to block # print("\nNon-UTF8 server output: {}\n".format(line))
queue.put(line) tLine = self._decodeBinLine(bLine)
self._printProgress("_i")
if tLine is not None:
if self._status.isStarting(): # we are starting, let's see if we have started # This might block, and then causing "out" buffer to block
if line.find(self.TD_READY_MSG) != -1: # found queue.put(tLine)
Logging.info("Waiting for the service to become FULLY READY") self._printProgress("_i")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
Logging.info("Service is now FULLY READY") # TODO: more ID info here? if self._status.isStarting(): # we are starting, let's see if we have started
self._status.set(Status.STATUS_RUNNING) if tLine.find(self.TD_READY_MSG) != -1: # found
Logging.info("Waiting for the service to become FULLY READY")
# Trim the queue if necessary: TODO: try this 1 out of 10 times time.sleep(1.0) # wait for the server to truly start. TODO: remove this
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size Logging.info("Service is now FULLY READY") # TODO: more ID info here?
self._status.set(Status.STATUS_RUNNING)
if self._status.isStopping(): # TODO: use thread status instead
# WAITING for stopping sub process to finish its outptu # Trim the queue if necessary: TODO: try this 1 out of 10 times
print("_w", end="", flush=True) self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
if self._status.isStopping(): # TODO: use thread status instead
# WAITING for stopping sub process to finish its outptu
print("_w", end="", flush=True)
# queue.put(line) # queue.put(line)
# meaning sub process must have died # meaning sub process must have died
......
...@@ -3,6 +3,13 @@ import argparse ...@@ -3,6 +3,13 @@ import argparse
gConfig: argparse.Namespace gConfig: argparse.Namespace
def init(): class Settings:
global gConfig @classmethod
gConfig = [] def init(cls):
\ No newline at end of file global gConfig
gConfig = []
@classmethod
def setConfig(cls, config):
global gConfig
gConfig = config
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册