import os import io import sys import threading import signal import logging import time import subprocess from typing import IO, List try: import psutil except: print("Psutil module needed, please install: sudo pip3 install psutil") sys.exit(-1) from queue import Queue, Empty from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress from .db import DbConn, DbTarget import crash_gen.settings class TdeInstance(): """ A class to capture the *static* information of a TDengine instance, including the location of the various files/directories, and basica configuration. """ @classmethod def _getBuildPath(cls): selfPath = os.path.dirname(os.path.realpath(__file__)) if ("community" in selfPath): projPath = selfPath[:selfPath.find("communit")] else: projPath = selfPath[:selfPath.find("tests")] buildPath = None for root, dirs, files in os.walk(projPath): if ("taosd" in files): rootRealPath = os.path.dirname(os.path.realpath(root)) if ("packaging" not in rootRealPath): buildPath = root[:len(root) - len("/build/bin")] break if buildPath == None: raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}" .format(selfPath, projPath)) return buildPath @classmethod def prepareGcovEnv(cls, env): # Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html bPath = cls._getBuildPath() # build PATH numSegments = len(bPath.split('/')) # "/x/TDengine/build" should yield 3 # numSegments += 2 # cover "/src" after build # numSegments = numSegments - 1 # DEBUG only env['GCOV_PREFIX'] = bPath + '/src_s' # Server side source env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings # VERY VERY important note: GCOV data collection NOT effective upon SIG_KILL Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format( numSegments, env['GCOV_PREFIX'] )) def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030): self._buildDir = self._getBuildPath() self._subdir = '/' + subdir # TODO: tolerate "/" self._port = port # TODO: support different IP address too self._fepPort = fepPort self._tInstNum = tInstNum self._smThread = ServiceManagerThread() def getDbTarget(self): return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port) def getPort(self): return self._port def __repr__(self): return "[TdeInstance: {}, subdir={}]".format( self._buildDir, Helper.getFriendlyPath(self._subdir)) def generateCfgFile(self): # print("Logger = {}".format(logger)) # buildPath = self.getBuildPath() # taosdPath = self._buildPath + "/build/bin/taosd" cfgDir = self.getCfgDir() cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed if os.path.exists(cfgFile): if os.path.isfile(cfgFile): Logging.warning("Config file exists already, skip creation: {}".format(cfgFile)) return # cfg file already exists, nothing to do else: raise CrashGenError("Invalid config file: {}".format(cfgFile)) # Now that the cfg file doesn't exist if os.path.exists(cfgDir): if not os.path.isdir(cfgDir): raise CrashGenError("Invalid config dir: {}".format(cfgDir)) # else: good path else: os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p" # Now we have a good cfg dir cfgValues = { 'runDir': self.getRunDir(), 'ip': '127.0.0.1', # TODO: change to a network addressable ip 'port': self._port, 'fepPort': self._fepPort, } cfgTemplate = """ dataDir {runDir}/data logDir {runDir}/log charset UTF-8 firstEp {ip}:{fepPort} fqdn {ip} serverPort {port} # was all 135 below dDebugFlag 135 cDebugFlag 135 rpcDebugFlag 135 qDebugFlag 135 # httpDebugFlag 143 # asyncLog 0 # tables 10 maxtablesPerVnode 10 rpcMaxTime 101 # cache 2 keep 36500 # walLevel 2 walLevel 1 # # maxConnections 100 quorum 2 """ cfgContent = cfgTemplate.format_map(cfgValues) f = open(cfgFile, "w") f.write(cfgContent) f.close() def rotateLogs(self): logPath = self.getLogDir() # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397 if os.path.exists(logPath): logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S') Logging.info("Saving old log files to: {}".format(logPathSaved)) os.rename(logPath, logPathSaved) # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms def getExecFile(self): # .../taosd return self._buildDir + "/build/bin/taosd" def getRunDir(self): # TODO: rename to "root dir" ?! return self._buildDir + self._subdir def getCfgDir(self): # path, not file return self.getRunDir() + "/cfg" def getLogDir(self): return self.getRunDir() + "/log" def getHostAddr(self): return "127.0.0.1" def getServiceCmdLine(self): # to start the instance cmdLine = [] if crash_gen.settings.gConfig.track_memory_leaks: Logging.info("Invoking VALGRIND on service...") cmdLine = ['valgrind', '--leak-check=yes'] cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() return cmdLine def _getDnodes(self, dbc): dbc.query("show dnodes") cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason return {c[1]:c[4] for c in cols} # {'xxx:6030':'ready', 'xxx:6130':'ready'} def createDnode(self, dbt: DbTarget): """ With a connection to the "first" EP, let's create a dnode for someone else who wants to join. """ dbc = DbConn.createNative(self.getDbTarget()) dbc.open() if dbt.getEp() in self._getDnodes(dbc): Logging.info("Skipping DNode creation for: {}".format(dbt)) dbc.close() return sql = "CREATE DNODE \"{}\"".format(dbt.getEp()) dbc.execute(sql) dbc.close() def getStatus(self): return self._smThread.getStatus() def getSmThread(self): return self._smThread def start(self): if not self.getStatus().isStopped(): raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus())) Logging.info("Starting TDengine instance: {}".format(self)) self.generateCfgFile() # service side generates config file, client does not self.rotateLogs() self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions def stop(self): self._smThread.stop() def isFirst(self): return self._tInstNum == 0 class TdeSubProcess: """ A class to to represent the actual sub process that is the run-time of a TDengine instance. It takes a TdeInstance object as its parameter, with the rationale being "a sub process runs an instance". """ # RET_ALREADY_STOPPED = -1 # RET_TIME_OUT = -3 # RET_SUCCESS = -4 def __init__(self): self.subProcess = None # type: subprocess.Popen # if tInst is None: # raise CrashGenError("Empty instance not allowed in TdeSubProcess") # self._tInst = tInst # Default create at ServiceManagerThread def __repr__(self): if self.subProcess is None: return '[TdeSubProc: Empty]' return '[TdeSubProc: pid = {}]'.format(self.getPid()) def getStdOut(self): return self.subProcess.stdout def getStdErr(self): return self.subProcess.stderr def isRunning(self): return self.subProcess is not None def getPid(self): return self.subProcess.pid def start(self, cmdLine): ON_POSIX = 'posix' in sys.builtin_module_names # Sanity check if self.subProcess: # already there raise RuntimeError("Corrupt process state") # Prepare environment variables for coverage information # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment myEnv = os.environ.copy() TdeInstance.prepareGcovEnv(myEnv) # print(myEnv) # print("Starting TDengine with env: ", myEnv.items()) # print("Starting TDengine via Shell: {}".format(cmdLineStr)) useShell = True # Needed to pass environments into it self.subProcess = subprocess.Popen( # ' '.join(cmdLine) if useShell else cmdLine, # shell=useShell, ' '.join(cmdLine), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, # bufsize=1, # not supported in binary mode close_fds=ON_POSIX, env=myEnv ) # had text=True, which interferred with reading EOF STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process? SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm def stop(self): """ Stop a sub process, DO NOT return anything, process all conditions INSIDE Common POSIX signal values (from man -7 signal): SIGHUP 1 SIGINT 2 SIGQUIT 3 SIGILL 4 SIGTRAP 5 SIGABRT 6 SIGIOT 6 SIGBUS 7 SIGEMT - SIGFPE 8 SIGKILL 9 SIGUSR1 10 SIGSEGV 11 SIGUSR2 12 """ if not self.subProcess: Logging.error("Sub process already stopped") return retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N) if retCode: # valid return code, process ended # retCode = -retCode # only if valid Logging.warning("TSP.stop(): process ended itself") self.subProcess = None return # process still alive, let's interrupt it self._stopForSure(self.subProcess, self.STOP_SIGNAL) # success if no exception self.subProcess = None # sub process should end, then IPC queue should end, causing IO thread to end @classmethod def _stopForSure(cls, proc: subprocess.Popen, sig: int): ''' Stop a process and all sub processes with a singal, and SIGKILL if necessary ''' def doKillTdService(proc: subprocess.Popen, sig: int): Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig)) proc.send_signal(sig) try: retCode = proc.wait(20) if (- retCode) == signal.SIGSEGV: # Crashed Logging.warning("Process {} CRASHED, please check CORE file!".format(proc.pid)) elif (- retCode) == sig : Logging.info("TD service terminated with expected return code {}".format(sig)) else: Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) return True # terminated successfully except subprocess.TimeoutExpired as err: Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig)) return False # failed to terminate def doKillChild(child: psutil.Process, sig: int): Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig)) child.send_signal(sig) try: retCode = child.wait(20) if (- retCode) == signal.SIGSEGV: # Crashed Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid)) elif (- retCode) == sig : Logging.info("Sub-sub process terminated with expected return code {}".format(sig)) else: Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) return True # terminated successfully except psutil.TimeoutExpired as err: Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig)) return False # did not terminate def doKill(proc: subprocess.Popen, sig: int): pid = proc.pid try: topSubProc = psutil.Process(pid) for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False Logging.warning("Unexpected child to be killed") doKillChild(child, sig) except psutil.NoSuchProcess as err: Logging.info("Process not found, can't kill, pid = {}".format(pid)) return doKillTdService(proc, sig) # TODO: re-examine if we need to kill the top process, which is always the SHELL for now # try: # proc.wait(1) # SHELL process here, may throw subprocess.TimeoutExpired exception # # expRetCode = self.SIG_KILL_RETCODE if sig==signal.SIGKILL else (-sig) # # if retCode == expRetCode: # # Logging.info("Process terminated with expected return code {}".format(retCode)) # # else: # # Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(expRetCode, retCode)) # # return True # success # except subprocess.TimeoutExpired as err: # Logging.warning("Failed to kill process {} with signal {}".format(pid, sig)) # return False # failed to kill def softKill(proc, sig): return doKill(proc, sig) def hardKill(proc): return doKill(proc, signal.SIGKILL) pid = proc.pid Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig)) if softKill(proc, sig): return# success if sig != signal.SIGKILL: # really was soft above if hardKill(proc): return raise CrashGenError("Failed to stop process, pid={}".format(pid)) class ServiceManager: PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process def __init__(self, numDnodes): # >1 when we run a cluster Logging.info("TDengine Service Manager (TSM) created") self._numDnodes = numDnodes # >1 means we have a cluster self._lock = threading.Lock() # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec # signal.signal(signal.SIGINT, self.sigIntHandler) # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! self.inSigHandler = False # self._status = MainExec.STATUS_RUNNING # set inside # _startTaosService() self._runCluster = (numDnodes > 1) self._tInsts : List[TdeInstance] = [] for i in range(0, numDnodes): ti = self._createTdeInstance(i) # construct tInst self._tInsts.append(ti) # self.svcMgrThreads : List[ServiceManagerThread] = [] # for i in range(0, numDnodes): # thread = self._createThread(i) # construct tInst # self.svcMgrThreads.append(thread) def _createTdeInstance(self, dnIndex): if not self._runCluster: # single instance subdir = 'test' else: # Create all threads in a cluster subdir = 'cluster_dnode_{}'.format(dnIndex) fepPort= 6030 # firstEP Port port = fepPort + dnIndex * 100 return TdeInstance(subdir, dnIndex, port, fepPort) # return ServiceManagerThread(dnIndex, ti) def _doMenu(self): choice = "" while True: print("\nInterrupting Service Program, Choose an Action: ") print("1: Resume") print("2: Terminate") print("3: Restart") # Remember to update the if range below # print("Enter Choice: ", end="", flush=True) while choice == "": choice = input("Enter Choice: ") if choice != "": break # done with reading repeated input if choice in ["1", "2", "3"]: break # we are done with whole method print("Invalid choice, please try again.") choice = "" # reset return choice def sigUsrHandler(self, signalNumber, frame): print("Interrupting main thread execution upon SIGUSR1") if self.inSigHandler: # already print("Ignoring repeated SIG...") return # do nothing if it's already not running self.inSigHandler = True choice = self._doMenu() if choice == "1": self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? elif choice == "2": self.stopTaosServices() elif choice == "3": # Restart self.restart() else: raise RuntimeError("Invalid menu choice: {}".format(choice)) self.inSigHandler = False def sigIntHandler(self, signalNumber, frame): print("ServiceManager: INT Signal Handler starting...") if self.inSigHandler: print("Ignoring repeated SIG_INT...") return self.inSigHandler = True self.stopTaosServices() print("ServiceManager: INT Signal Handler returning...") self.inSigHandler = False def sigHandlerResume(self): print("Resuming TDengine service manager (main thread)...\n\n") # def _updateThreadStatus(self): # if self.svcMgrThread: # valid svc mgr thread # if self.svcMgrThread.isStopped(): # done? # self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate? # self.svcMgrThread = None # no more def isActive(self): """ Determine if the service/cluster is active at all, i.e. at least one thread is not "stopped". """ for ti in self._tInsts: if not ti.getStatus().isStopped(): return True return False def isRunning(self): for ti in self._tInsts: if not ti.getStatus().isRunning(): return False return True # def isRestarting(self): # """ # Determine if the service/cluster is being "restarted", i.e., at least # one thread is in "restarting" status # """ # for thread in self.svcMgrThreads: # if thread.isRestarting(): # return True # return False def isStable(self): """ Determine if the service/cluster is "stable", i.e. all of the threads are in "stable" status. """ for ti in self._tInsts: if not ti.getStatus().isStable(): return False return True def _procIpcAll(self): while self.isActive(): Progress.emit(Progress.SERVICE_HEART_BEAT) for ti in self._tInsts: # all thread objects should always be valid # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here status = ti.getStatus() if status.isRunning(): th = ti.getSmThread() th.procIpcBatch() # regular processing, if status.isStopped(): th.procIpcBatch() # one last time? # self._updateThreadStatus() time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round # raise CrashGenError("dummy") Logging.info("Service Manager Thread (with subprocess) ended, main thread exiting...") def _getFirstInstance(self): return self._tInsts[0] def startTaosServices(self): with self._lock: if self.isActive(): raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running") # Find if there's already a taosd service, and then kill it for proc in psutil.process_iter(): if proc.name() == 'taosd': Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt") time.sleep(2.0) proc.kill() # print("Process: {}".format(proc.name())) # self.svcMgrThread = ServiceManagerThread() # create the object for ti in self._tInsts: ti.start() if not ti.isFirst(): tFirst = self._getFirstInstance() tFirst.createDnode(ti.getDbTarget()) ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines def stopTaosServices(self): with self._lock: if not self.isActive(): Logging.warning("Cannot stop TAOS service(s), already not active") return for ti in self._tInsts: ti.stop() def run(self): self.startTaosServices() self._procIpcAll() # pump/process all the messages, may encounter SIG + restart if self.isActive(): # if sig handler hasn't destroyed it by now self.stopTaosServices() # should have started already def restart(self): if not self.isStable(): Logging.warning("Cannot restart service/cluster, when not stable") return # self._isRestarting = True if self.isActive(): self.stopTaosServices() else: Logging.warning("Service not active when restart requested") self.startTaosServices() # self._isRestarting = False # def isRunning(self): # return self.svcMgrThread != None # def isRestarting(self): # return self._isRestarting class ServiceManagerThread: """ A class representing a dedicated thread which manages the "sub process" of the TDengine service, interacting with its STDOUT/ERR. It takes a TdeInstance parameter at creation time, or create a default """ MAX_QUEUE_SIZE = 10000 def __init__(self): # Set the sub process self._tdeSubProcess = None # type: TdeSubProcess # Arrange the TDengine instance # self._tInstNum = tInstNum # instance serial number in cluster, ZERO based # self._tInst = tInst or TdeInstance() # Need an instance self._thread = None # The actual thread, # type: threading.Thread self._thread2 = None # watching stderr self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually. def __repr__(self): return "[SvcMgrThread: status={}, subProc={}]".format( self.getStatus(), self._tdeSubProcess) def getStatus(self): ''' Get the status of the process being managed. (misnomer alert!) ''' return self._status # Start the thread (with sub process), and wait for the sub service # to become fully operational def start(self, cmdLine : str, logDir: str): ''' Request the manager thread to start a new sub process, and manage it. :param cmdLine: the command line to invoke :param logDir: the logging directory, to hold stdout/stderr files ''' if self._thread: raise RuntimeError("Unexpected _thread") if self._tdeSubProcess: raise RuntimeError("TDengine sub process already created/running") Logging.info("Attempting to start TAOS service: {}".format(self)) self._status.set(Status.STATUS_STARTING) self._tdeSubProcess = TdeSubProcess() self._tdeSubProcess.start(cmdLine) # TODO: verify process is running self._ipcQueue = Queue() self._thread = threading.Thread( # First thread captures server OUTPUT target=self.svcOutputReader, args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir)) self._thread.daemon = True # thread dies with the program self._thread.start() time.sleep(0.01) if not self._thread.is_alive(): # What happened? Logging.info("Failed to started process to monitor STDOUT") self.stop() raise CrashGenError("Failed to start thread to monitor STDOUT") Logging.info("Successfully started process to monitor STDOUT") self._thread2 = threading.Thread( # 2nd thread captures server ERRORs target=self.svcErrorReader, args=(self._tdeSubProcess.getStdErr(), self._ipcQueue, logDir)) self._thread2.daemon = True # thread dies with the program self._thread2.start() time.sleep(0.01) if not self._thread2.is_alive(): self.stop() raise CrashGenError("Failed to start thread to monitor STDERR") # wait for service to start for i in range(0, 100): time.sleep(1.0) # self.procIpcBatch() # don't pump message during start up Progress.emit(Progress.SERVICE_START_NAP) # print("_zz_", end="", flush=True) if self._status.isRunning(): Logging.info("[] TDengine service READY to process requests") Logging.info("[] TAOS service started: {}".format(self)) # self._verifyDnode(self._tInst) # query and ensure dnode is ready # Logging.debug("[] TAOS Dnode verified: {}".format(self)) return # now we've started # TODO: handle failure-to-start better? self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output raise RuntimeError("TDengine service did not start successfully: {}".format(self)) def _verifyDnode(self, tInst: TdeInstance): dbc = DbConn.createNative(tInst.getDbTarget()) dbc.open() dbc.query("show dnodes") # dbc.query("DESCRIBE {}.{}".format(dbName, self._stName)) cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason # ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type isValid = False for col in cols: # print("col = {}".format(col)) ep = col[1].split(':') # 10.1.30.2:6030 print("Found ep={}".format(ep)) if tInst.getPort() == int(ep[1]): # That's us # print("Valid Dnode matched!") isValid = True # now we are valid break if not isValid: print("Failed to start dnode, sleep for a while") time.sleep(600) raise RuntimeError("Failed to start Dnode, expected port not found: {}". format(tInst.getPort())) dbc.close() def stop(self): # can be called from both main thread or signal handler Logging.info("Terminating TDengine service running as the sub process...") if self.getStatus().isStopped(): Logging.info("Service already stopped") return if self.getStatus().isStopping(): Logging.info("Service is already being stopped, pid: {}".format(self._tdeSubProcess.getPid())) return # Linux will send Control-C generated SIGINT to the TDengine process # already, ref: # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes if not self._tdeSubProcess: raise RuntimeError("sub process object missing") self._status.set(Status.STATUS_STOPPING) # retCode = self._tdeSubProcess.stop() # try: # retCode = self._tdeSubProcess.stop() # # print("Attempted to stop sub process, got return code: {}".format(retCode)) # if retCode == signal.SIGSEGV : # SGV # Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") # except subprocess.TimeoutExpired as err: # Logging.info("Time out waiting for TDengine service process to exit") if not self._tdeSubProcess.stop(): # everything withing if self._tdeSubProcess.isRunning(): # still running, should now never happen Logging.error("FAILED to stop sub process, it is still running... pid = {}".format( self._tdeSubProcess.getPid())) else: self._tdeSubProcess = None # not running any more self.join() # stop the thread, change the status, etc. # Check if it's really stopped outputLines = 10 # for last output if self.getStatus().isStopped(): self.procIpcBatch(outputLines) # one last time Logging.debug("End of TDengine Service Output: {}".format(self)) Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n") else: print("WARNING: SMT did not terminate as expected: {}".format(self)) def join(self): # TODO: sanity check if not self.getStatus().isStopping(): raise RuntimeError( "SMT.Join(): Unexpected status: {}".format(self._status)) if self._thread or self._thread2 : if self._thread: self._thread.join() self._thread = None if self._thread2: # STD ERR thread self._thread2.join() self._thread2 = None else: print("Joining empty thread, doing nothing") self._status.set(Status.STATUS_STOPPED) def _trimQueue(self, targetSize): if targetSize <= 0: return # do nothing q = self._ipcQueue if (q.qsize() <= targetSize): # no need to trim return Logging.debug("Triming IPC queue to target size: {}".format(targetSize)) itemsToTrim = q.qsize() - targetSize for i in range(0, itemsToTrim): try: q.get_nowait() except Empty: break # break out of for loop, no more trimming TD_READY_MSG = "TDengine is initialized successfully" def procIpcBatch(self, trimToTarget=0, forceOutput=False): self._trimQueue(trimToTarget) # trim if necessary # Process all the output generated by the underlying sub process, # managed by IO thread print("<", end="", flush=True) while True: try: line = self._ipcQueue.get_nowait() # getting output at fast speed self._printProgress("_o") except Empty: # time.sleep(2.3) # wait only if there's no output # no more output print(".>", end="", flush=True) return # we are done with THIS BATCH else: # got line, printing out if forceOutput: Logging.info('[TAOSD] ' + line) else: Logging.debug('[TAOSD] ' + line) print(">", end="", flush=True) _ProgressBars = ["--", "//", "||", "\\\\"] def _printProgress(self, msg): # TODO: assuming 2 chars print(msg, end="", flush=True) pBar = self._ProgressBars[Dice.throw(4)] print(pBar, end="", flush=True) print('\b\b\b\b', end="", flush=True) def svcOutputReader(self, out: IO, queue, logDir: str): ''' The infinite routine that processes the STDOUT stream for the sub process being managed. :param out: the IO stream object used to fetch the data from :param queue: the queue where we dump the roughly parsed line-by-line data :param logDir: where we should dump a verbatim output file ''' os.makedirs(logDir, exist_ok=True) logFile = os.path.join(logDir,'stdout.log') fOut = open(logFile, 'wb') # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # print("This is the svcOutput Reader...") # for line in out : for line in iter(out.readline, b''): fOut.write(line) # print("Finished reading a line: {}".format(line)) # print("Adding item to queue...") try: line = line.decode("utf-8").rstrip() except UnicodeError: print("\nNon-UTF8 server output: {}\n".format(line)) # This might block, and then causing "out" buffer to block queue.put(line) self._printProgress("_i") if self._status.isStarting(): # we are starting, let's see if we have started if line.find(self.TD_READY_MSG) != -1: # found Logging.info("Waiting for the service to become FULLY READY") time.sleep(1.0) # wait for the server to truly start. TODO: remove this Logging.info("Service is now FULLY READY") # TODO: more ID info here? self._status.set(Status.STATUS_RUNNING) # Trim the queue if necessary: TODO: try this 1 out of 10 times self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size if self._status.isStopping(): # TODO: use thread status instead # WAITING for stopping sub process to finish its outptu print("_w", end="", flush=True) # queue.put(line) # meaning sub process must have died Logging.info("EOF for TDengine STDOUT: {}".format(self)) out.close() # Close the stream fOut.close() # Close the output file def svcErrorReader(self, err: IO, queue, logDir: str): os.makedirs(logDir, exist_ok=True) logFile = os.path.join(logDir,'stderr.log') fErr = open(logFile, 'wb') for line in iter(err.readline, b''): fErr.write(line) Logging.info("TDengine STDERR: {}".format(line)) Logging.info("EOF for TDengine STDERR: {}".format(self)) err.close() fErr.close()