提交 b77728bc 编写于 作者: S Steven Li

Refactored service side process management for the crash_gen tool

上级 f0bbd1d3
......@@ -1591,31 +1591,6 @@ class Dice():
raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop)
# Anyone needing to carry out work should simply come here
# class WorkDispatcher():
# def __init__(self, dbState):
# # self.totalNumMethods = 2
# self.tasks = [
# # CreateTableTask(dbState), # Obsolete
# # DropTableTask(dbState),
# # AddDataTask(dbState),
# ]
# def throwDice(self):
# max = len(self.tasks) - 1
# dRes = random.randint(0, max)
# # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
# return dRes
# def pickTask(self):
# dice = self.throwDice()
# return self.tasks[dice]
# def doWork(self, workerThread):
# task = self.pickTask()
# task.execute(workerThread)
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
if ( record.levelno >= logging.INFO ) :
......@@ -1633,46 +1608,15 @@ class MyLoggingAdapter(logging.LoggerAdapter):
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager:
def __init__(self):
print("Starting service manager")
print("Starting TDengine Service Manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler)
self.ioThread = None
self.subProcess = None
self.shouldStop = False
# self.status = MainExec.STATUS_RUNNING # set inside _startTaosService()
def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
line = line.decode("utf-8").rstrip()
queue.put(line) # This might block, and then causing "out" buffer to block
print("_i", end="", flush=True)
# Trim the queue if necessary
oneTenthQSize = self.MAX_QUEUE_SIZE // 10
if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize) ) : # 90% full?
print("Triming IPC queue by: {}".format(oneTenthQSize))
for i in range(0, oneTenthQSize) :
except Empty:
break # break out of for loop, no more trimming
if self.shouldStop :
print("Stopping to read output from sub process")
# queue.put(line)
print("\nNo more output (most likely) from IO thread managing TDengine service") # meaning sub process must have died
signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside _startTaosService()
self.svcMgrThread = None
def _doMenu(self):
choice = ""
......@@ -1695,10 +1639,10 @@ class SvcManager:
def sigUsrHandler(self, signalNumber, frame) :
print("Interrupting main thread execution upon SIGUSR1")
if self.status != MainExec.STATUS_RUNNING :
if self.inSigHandler : # already
print("Ignoring repeated SIG...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING
self.inSigHandler = True
choice = self._doMenu()
if choice == "1" :
......@@ -1711,67 +1655,227 @@ class SvcManager:
raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
print("Sig INT Handler starting...")
if self.status != MainExec.STATUS_RUNNING :
if self.inSigHandler :
print("Ignoring repeated SIG_INT...")
self.status = MainExec.STATUS_STOPPING # immediately set our status
self.inSigHandler = True
print("INT signal handler returning...")
self.inSigHandler = False
def sigHandlerResume(self) :
print("Resuming TDengine service manager thread (main thread)...\n\n")
self.status = MainExec.STATUS_RUNNING
def _checkServiceManagerThread(self):
if self.svcMgrThread: # valid svc mgr thread
if self.svcMgrThread.isStopped(): # done?
self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
self.svcMgrThread = None # no more
def _procIpcAll(self):
while self.svcMgrThread : # for as long as the svc mgr thread is still here
self.svcMgrThread.procIpcBatch() # regular processing,
time.sleep(0.5) # pause, before next round
print("Service Manager Thread (with subprocess) has ended, main thread now exiting...")
def startTaosService(self):
if self.svcMgrThread:
raise RuntimeError("Cannot start TAOS service when one may already be running")
self.svcMgrThread = ServiceManagerThread() # create the object
print("TAOS service started, printing out output...")
self.svcMgrThread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
print("TAOS service started")
def stopTaosService(self, outputLines = 20):
print("Terminating Service Manager Thread (SMT) execution...")
if not self.svcMgrThread:
raise RuntimeError("Unexpected empty svc mgr thread")
if self.svcMgrThread.isStopped():
self.svcMgrThread.procIpcBatch(outputLines) # one last time
self.svcMgrThread = None
print("----- End of TDengine Service Output -----\n")
print("SMT execution terminated")
print("WARNING: SMT did not terminate as expected")
def run(self):
self._procIpcAll() # pump/process all the messages
if self.svcMgrThread: # if sig handler hasn't destroyed it by now
self.stopTaosService() # should have started already
class ServiceManagerThread:
def __init__(self):
self._tdeSubProcess = None
self._thread = None
self._status = None
def getStatus(self):
return self._status
def isRunning(self):
# return self._thread and self._thread.is_alive()
return self._status == MainExec.STATUS_RUNNING
def isStopping(self):
return self._status == MainExec.STATUS_STOPPING
def isStopped(self):
return self._status == MainExec.STATUS_STOPPED
# Start the thread (with sub process), and wait for the sub service
# to become fully operational
def start(self):
if self._thread :
raise RuntimeError("Unexpected _thread")
if self._tdeSubProcess :
raise RuntimeError("TDengine sub process already created/running")
self._status = MainExec.STATUS_STARTING
self._tdeSubProcess = TdeSubProcess()
self._ipcQueue = Queue()
self._thread = threading.Thread(
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
self._thread.daemon = True # thread dies with the program
# wait for service to start
for i in range(0, 10) :
# self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True)
if self._status == MainExec.STATUS_RUNNING :
logger.info("[] TDengine service READY to process requests")
return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better?
def stop(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
if self.isStopped():
print("Service already stopped")
if self.isStopping():
print("Service is already being stopped")
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self._tdeSubProcess :
raise RuntimeError("sub process object missing")
def joinIoThread(self):
if self.ioThread :
self.ioThread = None
self._status = MainExec.STATUS_STOPPING
if self._tdeSubProcess.isRunning(): # still running
print("FAILED to stop sub process, it is still running... pid = {}".format(self.subProcess.pid))
self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc.
def join(self):
# TODO: sanity check
if not self.isStopping():
raise RuntimeError("Unexpected status when ending svc mgr thread: {}".format(self._status))
if self._thread :
self._thread = None
self._status = MainExec.STATUS_STOPPED
else :
print("Joining empty thread, doing nothing")
def _trimQueue(self, targetSize):
if targetSize <= 0:
return # do nothing
q = self._ipcQueue
if (q.qsize() <= targetSize ) : # no need to trim
logger.debug("Triming IPC queue to target size: {}".format(targetSize))
itemsToTrim = q.qsize() - targetSize
for i in range(0, itemsToTrim) :
except Empty:
break # break out of for loop, no more trimming
TD_READY_MSG = "TDengine is initialized successfully"
def _procIpcBatch(self):
def procIpcBatch(self, trimToTarget = 0, forceOutput = False):
self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process, managed by IO thread
while True :
print("<", end="", flush=True)
while True :
line = self.ipcQueue.get_nowait() # getting output at fast speed
print("_o", end="", flush=True)
if self.status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1 : # found
self.status = MainExec.STATUS_RUNNING
line = self._ipcQueue.get_nowait() # getting output at fast speed
except Empty:
# time.sleep(2.3) # wait only if there's no output
# no more output
print(".>", end="", flush=True)
return # we are done with THIS BATCH
else: # got line
else: # got line, printing out
if forceOutput:
print(">", end="", flush=True)
_ProgressBars = ["--", "//", "||", "\\\\"]
def _printProgress(self, msg): # TODO: assuming 2 chars
print(msg, end="", flush=True)
pBar = self._ProgressBars[Dice.throw(4)]
print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True)
def _procIpcAll(self):
while True :
print("<", end="", flush=True)
self._procIpcBatch() # process one batch
# check if the ioThread is still running
if (not self.ioThread) or (not self.ioThread.is_alive()):
print("IO Thread (with subprocess) has ended, main thread now exiting...")
self._procIpcBatch() # one more batch
return # TODO: maybe one last batch?
# Maybe handler says we should exit now
if self.shouldStop:
print("Main thread ending all IPC processing with IOThread/SubProcess")
self._procIpcBatch() # one more batch
def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
line = line.decode("utf-8").rstrip()
queue.put(line) # This might block, and then causing "out" buffer to block
if self._status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1 : # found
self._status = MainExec.STATUS_RUNNING
# Trim the queue if necessary: TODO: try this 1 out of 10 times
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
if self.isStopping() : # TODO: use thread status instead
print("_w", end="", flush=True) # WAITING for stopping sub process to finish its outptu
# queue.put(line)
print("\nNo more output from IO thread managing TDengine service") # meaning sub process must have died
class TdeSubProcess:
def __init__(self):
self.subProcess = None
def getStdOut(self):
return self.subProcess.stdout
print(">", end="", flush=True)
def isRunning(self):
return self.subProcess != None
def startTaosService(self):
def start(self):
ON_POSIX = 'posix' in sys.builtin_module_names
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
# svcCmd = ['vmstat', '1']
......@@ -1782,41 +1886,18 @@ class SvcManager:
# bufsize=1, # not supported in binary mode
close_fds=ON_POSIX) # had text=True, which interferred with reading EOF
self.ipcQueue = Queue()
if self.ioThread :
raise RuntimeError("Corrupt thread state")
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, self.ipcQueue))
self.ioThread.daemon = True # thread dies with the program
self.shouldStop = False # don't let the main loop stop
self.status = MainExec.STATUS_STARTING
# wait for service to start
for i in range(0, 10) :
self._procIpcBatch() # pump messages
print("_zz_", end="", flush=True)
if self.status == MainExec.STATUS_RUNNING :
print("TDengine service READY to process requests")
return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better?
close_fds=ON_POSIX) # had text=True, which interferred with reading EOF
def stopTaosService(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self.subProcess :
print("Process already stopped")
def stop(self):
if not self.subProcess:
print("Sub process already stopped")
retCode = self.subProcess.poll()
if retCode : # valid return code, process ended
self.subProcess = None
else: # process still alive, let's interrupt it
print("Sub process still running, sending SIG_INT and waiting for it to stop...")
print("Sub process is running, sending SIG_INT and waiting for it to terminate...")
self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end
try :
......@@ -1826,41 +1907,20 @@ class SvcManager:
print("TDengine service process terminated successfully from SIG_INT")
self.subProcess = None
if self.subProcess and (not self.subProcess.poll()):
print("Sub process is still running... pid = {}".format(self.subProcess.pid))
self.shouldStop = True
def run(self):
# proc = subprocess.Popen(['echo', '"to stdout"'],
# stdout=subprocess.PIPE,
# )
# stdout_value = proc.communicate()[0]
# print('\tstdout: {}'.format(repr(stdout_value)))
print("End of loop reading from IPC queue")
self.joinIoThread() # should have started already
print("SvcManager Run Finished")
class ClientManager:
def __init__(self):
print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
self.status = MainExec.STATUS_RUNNING
self._status = MainExec.STATUS_RUNNING
self.tc = None
def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING :
if self._status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status
self._status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...")
......@@ -1904,16 +1964,16 @@ class ClientManager:
dbManager = DbManager() # Regular function
Dice.seed(0) # initial seeding of dice
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
# print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed()))
# print("TC failed = {}".format(self.tc.isFailed()))
if gConfig.auto_start_service :
# Print exec status, etc., AFTER showing messages from the server
# print("TC failed (2) = {}".format(self.tc.isFailed()))
return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/
......@@ -1924,9 +1984,9 @@ class ClientManager:
class MainExec:
# STATUS_STOPPED = 3 # Not used yet
def runClient(cls):
......@@ -2015,10 +2075,6 @@ def main():
global gConfig
gConfig = parser.parse_args()
# if len(sys.argv) == 1:
# parser.print_help()
# sys.exit()
# Logging Stuff
global logger
......@@ -2034,15 +2090,14 @@ def main():
Dice.seed(0) # initial seeding of dice
# Run server or client
if gConfig.run_tdengine : # run server
else :
return MainExec.runClient()
# logger.info("Crash_Gen execution finished")
if __name__ == "__main__":
exitCode = main()
# print("Exiting with code: {}".format(exitCode))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册