service_manager.py 37.6 KB
Newer Older
1 2
from __future__ import annotations

3 4 5
import os
import io
import sys
6
from enum import Enum
7 8 9 10
import threading
import signal
import logging
import time
11 12
from subprocess import PIPE, Popen, TimeoutExpired
from typing import IO, List, NewType, Optional
13 14 15 16 17 18 19

try:
    import psutil
except:
    print("Psutil module needed, please install: sudo pip3 install psutil")
    sys.exit(-1)
from queue import Queue, Empty
20

21 22 23 24
from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
from crash_gen.db import DbConn, DbTarget
from crash_gen.settings import Settings
from crash_gen.types import DirPath
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52

class TdeInstance():
    """
    A class to capture the *static* information of a TDengine instance,
    including the location of the various files/directories, and basica
    configuration.
    """

    @classmethod
    def _getBuildPath(cls):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        buildPath = None
        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
                    buildPath = root[:len(root) - len("/build/bin")]
                    break
        if buildPath == None:
            raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
                .format(selfPath, projPath))
        return buildPath

S
Steven Li 已提交
53 54 55 56
    @classmethod
    def prepareGcovEnv(cls, env):
        # Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html
        bPath = cls._getBuildPath() # build PATH
57 58 59 60
        numSegments = len(bPath.split('/')) # "/x/TDengine/build" should yield 3
        # numSegments += 2 # cover "/src" after build
        # numSegments = numSegments - 1 # DEBUG only
        env['GCOV_PREFIX'] = bPath + '/src_s' # Server side source
S
Steven Li 已提交
61
        env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings
62
        # VERY VERY important note: GCOV data collection NOT effective upon SIG_KILL
S
Steven Li 已提交
63 64 65
        Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format(
            numSegments, env['GCOV_PREFIX'] ))

66
    def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030):
67 68 69 70 71
        self._buildDir  = self._getBuildPath()
        self._subdir    = '/' + subdir # TODO: tolerate "/"
        self._port      = port # TODO: support different IP address too
        self._fepPort   = fepPort

72
        self._tInstNum    = tInstNum
73 74 75 76

        # An "Tde Instance" will *contain* a "sub process" object, with will/may use a thread internally
        # self._smThread    = ServiceManagerThread()
        self._subProcess  = None # type: Optional[TdeSubProcess]
77

78 79 80 81 82
    def getDbTarget(self):
        return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)

    def getPort(self):
        return self._port
83 84

    def __repr__(self):
85 86
        return "[TdeInstance: {}, subdir={}]".format(
            self._buildDir, Helper.getFriendlyPath(self._subdir))
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
    
    def generateCfgFile(self):       
        # print("Logger = {}".format(logger))
        # buildPath = self.getBuildPath()
        # taosdPath = self._buildPath + "/build/bin/taosd"

        cfgDir  = self.getCfgDir()
        cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed
        if os.path.exists(cfgFile):
            if os.path.isfile(cfgFile):
                Logging.warning("Config file exists already, skip creation: {}".format(cfgFile))
                return # cfg file already exists, nothing to do
            else:
                raise CrashGenError("Invalid config file: {}".format(cfgFile))
        # Now that the cfg file doesn't exist
        if os.path.exists(cfgDir):
            if not os.path.isdir(cfgDir):
                raise CrashGenError("Invalid config dir: {}".format(cfgDir))
            # else: good path
        else: 
            os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
        # Now we have a good cfg dir
        cfgValues = {
110 111 112 113
            'runDir':   self.getRunDir(),
            'ip':       '127.0.0.1', # TODO: change to a network addressable ip
            'port':     self._port,
            'fepPort':  self._fepPort,
114 115 116 117 118 119 120
        }
        cfgTemplate = """
dataDir {runDir}/data
logDir  {runDir}/log

charset UTF-8

121
firstEp {ip}:{fepPort}
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
fqdn {ip}
serverPort {port}

# was all 135 below
dDebugFlag 135
cDebugFlag 135
rpcDebugFlag 135
qDebugFlag 135
# httpDebugFlag 143
# asyncLog 0
# tables 10
maxtablesPerVnode 10
rpcMaxTime 101
# cache 2
keep 36500
# walLevel 2
walLevel 1
#
# maxConnections 100
141
quorum 2
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
"""
        cfgContent = cfgTemplate.format_map(cfgValues)
        f = open(cfgFile, "w")
        f.write(cfgContent)
        f.close()

    def rotateLogs(self):
        logPath = self.getLogDir()
        # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
        if os.path.exists(logPath):
            logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
            Logging.info("Saving old log files to: {}".format(logPathSaved))
            os.rename(logPath, logPathSaved)
        # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms


    def getExecFile(self): # .../taosd
        return self._buildDir + "/build/bin/taosd"

161 162
    def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?!
        return DirPath(self._buildDir + self._subdir)
163

164 165
    def getCfgDir(self) -> DirPath : # path, not file
        return DirPath(self.getRunDir() + "/cfg")
166

167 168
    def getLogDir(self) -> DirPath :
        return DirPath(self.getRunDir() + "/log")
169 170 171 172

    def getHostAddr(self):
        return "127.0.0.1"

173
    def getServiceCmdLine(self): # to start the instance
174
        cmdLine = []
175
        if Settings.getConfig().track_memory_leaks:
176 177
            Logging.info("Invoking VALGRIND on service...")
            cmdLine = ['valgrind', '--leak-check=yes']
178
        # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
179 180
        cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()        
        return cmdLine
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
    
    def _getDnodes(self, dbc):
        dbc.query("show dnodes")
        cols = dbc.getQueryResult() #  id,end_point,vnodes,cores,status,role,create_time,offline reason
        return {c[1]:c[4] for c in cols} # {'xxx:6030':'ready', 'xxx:6130':'ready'}

    def createDnode(self, dbt: DbTarget):
        """
        With a connection to the "first" EP, let's create a dnode for someone else who
        wants to join.
        """
        dbc = DbConn.createNative(self.getDbTarget())
        dbc.open()

        if dbt.getEp() in self._getDnodes(dbc):
            Logging.info("Skipping DNode creation for: {}".format(dbt))
            dbc.close()
            return

        sql = "CREATE DNODE \"{}\"".format(dbt.getEp())
        dbc.execute(sql)
        dbc.close()

    def getStatus(self):
205 206 207 208
        # return self._smThread.getStatus()
        if self._subProcess is None:
            return Status(Status.STATUS_EMPTY)
        return self._subProcess.getStatus()
209

210 211
    # def getSmThread(self):
    #     return self._smThread
212 213

    def start(self):
214
        if self.getStatus().isActive():
215 216 217 218 219 220
            raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))

        Logging.info("Starting TDengine instance: {}".format(self))
        self.generateCfgFile() # service side generates config file, client does not
        self.rotateLogs()

221 222
        # self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
        self._subProcess = TdeSubProcess(self.getServiceCmdLine(),  self.getLogDir())
223 224

    def stop(self):
225 226
        self._subProcess.stop()
        self._subProcess = None
227 228 229

    def isFirst(self):
        return self._tInstNum == 0
230

231 232 233 234 235 236 237 238 239 240 241 242 243 244
    def printFirst10Lines(self):
        if self._subProcess is None:
            Logging.warning("Incorrect TI status for procIpcBatch-10 operation")
            return
        self._subProcess.procIpcBatch(trimToTarget=10, forceOutput=True)  

    def procIpcBatch(self):
        if self._subProcess is None:
            Logging.warning("Incorrect TI status for procIpcBatch operation")
            return
        self._subProcess.procIpcBatch() # may enounter EOF and change status to STOPPED
        if self._subProcess.getStatus().isStopped():
            self._subProcess.stop()
            self._subProcess = None
245 246 247 248 249 250 251 252

class TdeSubProcess:
    """
    A class to to represent the actual sub process that is the run-time
    of a TDengine instance. 

    It takes a TdeInstance object as its parameter, with the rationale being
    "a sub process runs an instance".
253 254 255

    We aim to ensure that this object has exactly the same life-cycle as the 
    underlying sub process.
256 257
    """

258 259 260 261
    # RET_ALREADY_STOPPED = -1
    # RET_TIME_OUT = -3
    # RET_SUCCESS = -4

262 263 264 265 266 267 268 269 270
    def __init__(self, cmdLine: List[str], logDir: DirPath):
        # Create the process + managing thread immediately

        Logging.info("Attempting to start TAOS sub process...")
        self._popen     = self._start(cmdLine) # the actual sub process
        self._smThread  = ServiceManagerThread(self, logDir)  # A thread to manage the sub process, mostly to process the IO
        Logging.info("Successfully started TAOS process: {}".format(self))


271

S
Steven Li 已提交
272
    def __repr__(self):
273 274
        # if self.subProcess is None:
        #     return '[TdeSubProc: Empty]'
275 276
        return '[TdeSubProc: pid = {}, status = {}]'.format(
            self.getPid(), self.getStatus() )
S
Steven Li 已提交
277

278
    def getStdOut(self):
279
        return self._popen.stdout
280 281

    def getStdErr(self):
282
        return self._popen.stderr
283

284 285 286
    # Now it's always running, since we matched the life cycle
    # def isRunning(self):
    #     return self.subProcess is not None
287 288

    def getPid(self):
289
        return self._popen.pid
290

291
    def _start(self, cmdLine) -> Popen :
292 293 294
        ON_POSIX = 'posix' in sys.builtin_module_names

        # Sanity check
295 296
        # if self.subProcess:  # already there
        #     raise RuntimeError("Corrupt process state")
S
Steven Li 已提交
297

298
        
S
Steven Li 已提交
299 300 301 302 303 304
        # Prepare environment variables for coverage information
        # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
        myEnv = os.environ.copy()
        TdeInstance.prepareGcovEnv(myEnv)

        # print(myEnv)
305
        # print("Starting TDengine with env: ", myEnv.items())
S
Steven Li 已提交
306 307
        # print("Starting TDengine via Shell: {}".format(cmdLineStr))

308
        # useShell = True # Needed to pass environments into it
309
        return Popen(            
310 311 312 313
            ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
            shell=True, # Always use shell, since we need to pass ENV vars
            stdout=PIPE,
            stderr=PIPE,
S
Steven Li 已提交
314 315
            close_fds=ON_POSIX,
            env=myEnv
316 317
            )  # had text=True, which interferred with reading EOF

318
    STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
319
    SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
S
Steven Li 已提交
320

321
    def stop(self):
322
        """
323 324 325
        Stop a sub process, DO NOT return anything, process all conditions INSIDE.

        Calling function should immediately delete/unreference the object
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342

        Common POSIX signal values (from man -7 signal):
        SIGHUP           1
        SIGINT           2 
        SIGQUIT          3 
        SIGILL           4
        SIGTRAP          5
        SIGABRT          6 
        SIGIOT           6 
        SIGBUS           7 
        SIGEMT           - 
        SIGFPE           8  
        SIGKILL          9  
        SIGUSR1         10 
        SIGSEGV         11
        SIGUSR2         12
        """
343 344
        # self._popen should always be valid.

345 346 347 348 349 350 351 352 353
        Logging.info("Terminating TDengine service running as the sub process...")
        if self.getStatus().isStopped():
            Logging.info("Service already stopped")
            return
        if self.getStatus().isStopping():
            Logging.info("Service is already being stopped, pid: {}".format(self.getPid()))
            return

        self.setStatus(Status.STATUS_STOPPING)
354

355
        retCode = self._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
356
        if retCode:  # valid return code, process ended
357
            # retCode = -retCode # only if valid
358
            Logging.warning("TSP.stop(): process ended itself")
359
            # self.subProcess = None
360
            return
361 362

        # process still alive, let's interrupt it
363
        self._stopForSure(self._popen, self.STOP_SIGNAL) # success if no exception
364

365 366 367 368
        # sub process should end, then IPC queue should end, causing IO thread to end  
        self._smThread.stop() # stop for sure too

        self.setStatus(Status.STATUS_STOPPED)
369 370

    @classmethod
371
    def _stopForSure(cls, proc: Popen, sig: int):
372 373 374
        ''' 
        Stop a process and all sub processes with a singal, and SIGKILL if necessary
        '''
375
        def doKillTdService(proc: Popen, sig: int):
376 377 378 379 380 381 382 383 384 385 386
            Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
            proc.send_signal(sig)
            try:            
                retCode = proc.wait(20)
                if (- retCode) == signal.SIGSEGV: # Crashed
                    Logging.warning("Process {} CRASHED, please check CORE file!".format(proc.pid))
                elif (- retCode) == sig : 
                    Logging.info("TD service terminated with expected return code {}".format(sig))
                else:
                    Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
                return True # terminated successfully
387
            except TimeoutExpired as err:
388 389 390 391 392 393 394 395
                Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig))
            return False # failed to terminate


        def doKillChild(child: psutil.Process, sig: int):
            Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig))
            child.send_signal(sig)
            try:            
396 397
                retCode = child.wait(20) # type: ignore
                if (- retCode) == signal.SIGSEGV: # type: ignore # Crashed
398
                    Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid))
399
                elif (- retCode) == sig : # type: ignore
400 401
                    Logging.info("Sub-sub process terminated with expected return code {}".format(sig))
                else:
402
                    Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) # type: ignore
403 404 405 406 407
                return True # terminated successfully
            except psutil.TimeoutExpired as err:
                Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig))
            return False # did not terminate

408
        def doKill(proc: Popen, sig: int):
409 410
            pid = proc.pid
            try:
411
                topSubProc = psutil.Process(pid) # Now that we are doing "exec -c", should not have children any more
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
                for child in topSubProc.children(recursive=True):  # or parent.children() for recursive=False
                    Logging.warning("Unexpected child to be killed")
                    doKillChild(child, sig)
            except psutil.NoSuchProcess as err:
                Logging.info("Process not found, can't kill, pid = {}".format(pid))
            
            return doKillTdService(proc, sig)
            # TODO: re-examine if we need to kill the top process, which is always the SHELL for now
            # try:
            #     proc.wait(1) # SHELL process here, may throw subprocess.TimeoutExpired exception
            #     # expRetCode = self.SIG_KILL_RETCODE if sig==signal.SIGKILL else (-sig)
            #     # if retCode == expRetCode:
            #     #     Logging.info("Process terminated with expected return code {}".format(retCode))
            #     # else:
            #     #     Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(expRetCode, retCode))
            #     # return True # success
            # except subprocess.TimeoutExpired as err:
            #     Logging.warning("Failed to kill process {} with signal {}".format(pid, sig))
            # return False # failed to kill

        def softKill(proc, sig):
            return doKill(proc, sig)

        def hardKill(proc):
436
            return doKill(proc, signal.SIGKILL) 
437 438 439 440

        pid = proc.pid
        Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig))
        if softKill(proc, sig):            
441
            return # success
442 443
        if sig != signal.SIGKILL: # really was soft above            
            if hardKill(proc):
444
                return 
445
        raise CrashGenError("Failed to stop process, pid={}".format(pid))
446

447 448 449 450 451 452 453 454 455
    def getStatus(self):
        return self._smThread.getStatus()

    def setStatus(self, status):
        self._smThread.setStatus(status)

    def procIpcBatch(self, trimToTarget=0, forceOutput=False):
        self._smThread.procIpcBatch(trimToTarget, forceOutput)

456 457 458
class ServiceManager:
    PAUSE_BETWEEN_IPC_CHECK = 1.2  # seconds between checks on STDOUT of sub process

459
    def __init__(self, numDnodes): # >1 when we run a cluster
460 461
        Logging.info("TDengine Service Manager (TSM) created")
        self._numDnodes = numDnodes # >1 means we have a cluster
462
        self._lock = threading.Lock()
463 464 465 466 467 468 469
        # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
        # signal.signal(signal.SIGINT, self.sigIntHandler)
        # signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!

        self.inSigHandler = False
        # self._status = MainExec.STATUS_RUNNING # set inside
        # _startTaosService()
470
        self._runCluster = (numDnodes > 1)
471
        self._tInsts : List[TdeInstance] = []
472
        for i in range(0, numDnodes):
473 474 475 476 477 478 479
            ti = self._createTdeInstance(i) # construct tInst
            self._tInsts.append(ti)

        # self.svcMgrThreads : List[ServiceManagerThread] = []
        # for i in range(0, numDnodes):
        #     thread = self._createThread(i) # construct tInst
        #     self.svcMgrThreads.append(thread)
480

481
    def _createTdeInstance(self, dnIndex):
482 483 484 485
        if not self._runCluster: # single instance 
            subdir = 'test'
        else:        # Create all threads in a cluster
            subdir = 'cluster_dnode_{}'.format(dnIndex)
486 487
        fepPort= 6030 # firstEP Port
        port   = fepPort + dnIndex * 100
488 489
        return TdeInstance(subdir, dnIndex, port, fepPort)
        # return ServiceManagerThread(dnIndex, ti)
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551

    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Service Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Restart")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":            
            self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
        elif choice == "2":
            self.stopTaosServices()
        elif choice == "3": # Restart
            self.restart()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

    def sigIntHandler(self, signalNumber, frame):
        print("ServiceManager: INT Signal Handler starting...")
        if self.inSigHandler:
            print("Ignoring repeated SIG_INT...")
            return
        self.inSigHandler = True

        self.stopTaosServices()
        print("ServiceManager: INT Signal Handler returning...")
        self.inSigHandler = False

    def sigHandlerResume(self):
        print("Resuming TDengine service manager (main thread)...\n\n")

    # def _updateThreadStatus(self):
    #     if self.svcMgrThread:  # valid svc mgr thread
    #         if self.svcMgrThread.isStopped():  # done?
    #             self.svcMgrThread.procIpcBatch()  # one last time. TODO: appropriate?
    #             self.svcMgrThread = None  # no more

    def isActive(self):
        """
        Determine if the service/cluster is active at all, i.e. at least
552
        one instance is active
553
        """
554
        for ti in self._tInsts:
555
            if ti.getStatus().isActive():
556 557 558
                return True
        return False

559 560 561 562 563 564 565
    def isRunning(self):
        for ti in self._tInsts:
            if not ti.getStatus().isRunning():
                return False
        return True


566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
    # def isRestarting(self):
    #     """
    #     Determine if the service/cluster is being "restarted", i.e., at least
    #     one thread is in "restarting" status
    #     """
    #     for thread in self.svcMgrThreads:
    #         if thread.isRestarting():
    #             return True
    #     return False

    def isStable(self):
        """
        Determine if the service/cluster is "stable", i.e. all of the
        threads are in "stable" status.
        """
581
        for ti in self._tInsts:
582
            if not ti.getStatus().isStable():
583 584 585 586 587
                return False
        return True

    def _procIpcAll(self):
        while self.isActive():
588 589
            Progress.emit(Progress.SERVICE_HEART_BEAT)
            for ti in self._tInsts: # all thread objects should always be valid
590
            # while self.isRunning() or self.isRestarting() :  # for as long as the svc mgr thread is still here
591 592
                status = ti.getStatus()
                if  status.isRunning():
593 594
                    # th = ti.getSmThread()
                    ti.procIpcBatch()  # regular processing,
595
                    if  status.isStopped():
596
                        ti.procIpcBatch() # one last time?
597 598 599 600
                    # self._updateThreadStatus()
                    
            time.sleep(self.PAUSE_BETWEEN_IPC_CHECK)  # pause, before next round
        # raise CrashGenError("dummy")
S
Steven Li 已提交
601
        Logging.info("Service Manager Thread (with subprocess) ended, main thread exiting...")
602

603 604 605
    def _getFirstInstance(self):
        return self._tInsts[0]

606 607 608 609 610 611 612 613
    def startTaosServices(self):
        with self._lock:
            if self.isActive():
                raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running")

            # Find if there's already a taosd service, and then kill it
            for proc in psutil.process_iter():
                if proc.name() == 'taosd':
S
Steven Li 已提交
614
                    Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
615 616 617 618 619
                    time.sleep(2.0)
                    proc.kill()
                # print("Process: {}".format(proc.name()))
            
            # self.svcMgrThread = ServiceManagerThread()  # create the object
620 621 622 623 624 625
            
            for ti in self._tInsts:
                ti.start()  
                if not ti.isFirst():                                    
                    tFirst = self._getFirstInstance()
                    tFirst.createDnode(ti.getDbTarget())
626 627
                ti.printFirst10Lines()
                # ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True)  # for printing 10 lines                                     
628 629 630 631 632 633 634

    def stopTaosServices(self):
        with self._lock:
            if not self.isActive():
                Logging.warning("Cannot stop TAOS service(s), already not active")
                return

635 636
            for ti in self._tInsts:
                ti.stop()
637 638 639 640 641 642 643 644
                
    def run(self):
        self.startTaosServices()
        self._procIpcAll()  # pump/process all the messages, may encounter SIG + restart
        if  self.isActive():  # if sig handler hasn't destroyed it by now
            self.stopTaosServices()  # should have started already

    def restart(self):
645
        if not self.isStable():
646 647 648 649 650 651 652 653 654
            Logging.warning("Cannot restart service/cluster, when not stable")
            return

        # self._isRestarting = True
        if  self.isActive():
            self.stopTaosServices()
        else:
            Logging.warning("Service not active when restart requested")

655
        self.startTaosServices()
656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
        # self._isRestarting = False

    # def isRunning(self):
    #     return self.svcMgrThread != None

    # def isRestarting(self):
    #     return self._isRestarting

class ServiceManagerThread:
    """
    A class representing a dedicated thread which manages the "sub process"
    of the TDengine service, interacting with its STDOUT/ERR.

    It takes a TdeInstance parameter at creation time, or create a default    
    """
    MAX_QUEUE_SIZE = 10000

673
    def __init__(self, subProc: TdeSubProcess, logDir: str):
674
        # Set the sub process
675
        # self._tdeSubProcess = None # type: TdeSubProcess
676 677

        # Arrange the TDengine instance
678 679
        # self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
        # self._tInst    = tInst or TdeInstance() # Need an instance
680

681 682
        # self._thread  = None # type: Optional[threading.Thread]  # The actual thread, # type: threading.Thread
        # self._thread2 = None # type: Optional[threading.Thread] Thread  # watching stderr
683
        self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
684

685 686
        self._start(subProc, logDir)

687
    def __repr__(self):
688 689 690
        raise CrashGenError("SMT status moved to TdeSubProcess")
        # return "[SvcMgrThread: status={}, subProc={}]".format(
        #     self.getStatus(), self._tdeSubProcess)
691 692

    def getStatus(self):
693 694 695
        '''
        Get the status of the process being managed. (misnomer alert!)
        '''
696 697
        return self._status

698 699 700
    def setStatus(self, statusVal: int):
        self._status.set(statusVal)

701 702
    # Start the thread (with sub process), and wait for the sub service
    # to become fully operational
703
    def _start(self, subProc :TdeSubProcess, logDir: str):
704 705 706 707 708 709
        '''
        Request the manager thread to start a new sub process, and manage it.

        :param cmdLine: the command line to invoke
        :param logDir: the logging directory, to hold stdout/stderr files
        '''
710 711 712 713
        # if self._thread:
        #     raise RuntimeError("Unexpected _thread")
        # if self._tdeSubProcess:
        #     raise RuntimeError("TDengine sub process already created/running")
714

715 716
        # Moved to TdeSubProcess
        # Logging.info("Attempting to start TAOS service: {}".format(self))
717

718
        self._status.set(Status.STATUS_STARTING)
719
        # self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running
720

721
        self._ipcQueue = Queue() # type: Queue
722 723
        self._thread = threading.Thread( # First thread captures server OUTPUT
            target=self.svcOutputReader,
724
            args=(subProc.getStdOut(), self._ipcQueue, logDir))
725 726
        self._thread.daemon = True  # thread dies with the program
        self._thread.start()
727 728 729 730 731 732
        time.sleep(0.01)
        if not self._thread.is_alive(): # What happened?
            Logging.info("Failed to started process to monitor STDOUT")
            self.stop()
            raise CrashGenError("Failed to start thread to monitor STDOUT")
        Logging.info("Successfully started process to monitor STDOUT")
733 734 735

        self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
            target=self.svcErrorReader,
736
            args=(subProc.getStdErr(), self._ipcQueue, logDir))
737 738
        self._thread2.daemon = True  # thread dies with the program
        self._thread2.start()
739 740 741 742
        time.sleep(0.01)
        if not self._thread2.is_alive():
            self.stop()
            raise CrashGenError("Failed to start thread to monitor STDERR")
743 744 745 746 747

        # wait for service to start
        for i in range(0, 100):
            time.sleep(1.0)
            # self.procIpcBatch() # don't pump message during start up
S
Steven Li 已提交
748 749
            Progress.emit(Progress.SERVICE_START_NAP)
            # print("_zz_", end="", flush=True)
750
            if self._status.isRunning():
751 752
                Logging.info("[] TDengine service READY to process requests: pid={}".format(subProc.getPid()))
                # Logging.info("[] TAOS service started: {}".format(self))
753 754
                # self._verifyDnode(self._tInst) # query and ensure dnode is ready
                # Logging.debug("[] TAOS Dnode verified: {}".format(self))
755 756 757
                return  # now we've started
        # TODO: handle failure-to-start  better?
        self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
758
        raise RuntimeError("TDengine service DID NOT achieve READY status: pid={}".format(subProc.getPid()))
759

760 761 762 763 764 765 766 767 768
    def _verifyDnode(self, tInst: TdeInstance):
        dbc = DbConn.createNative(tInst.getDbTarget())
        dbc.open()
        dbc.query("show dnodes")
        # dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
        cols = dbc.getQueryResult() #  id,end_point,vnodes,cores,status,role,create_time,offline reason
        # ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        isValid = False
        for col in cols:
769
            # print("col = {}".format(col))
770
            ep = col[1].split(':') # 10.1.30.2:6030
771
            print("Found ep={}".format(ep))
772
            if tInst.getPort() == int(ep[1]): # That's us
773
                # print("Valid Dnode matched!")
774 775 776
                isValid = True # now we are valid
                break
        if not isValid:
777
            print("Failed to start dnode, sleep for a while")
778
            time.sleep(10.0)
779 780
            raise RuntimeError("Failed to start Dnode, expected port not found: {}".
                format(tInst.getPort()))
781 782
        dbc.close()

783 784
    def stop(self):
        # can be called from both main thread or signal handler
785

786 787 788
        # Linux will send Control-C generated SIGINT to the TDengine process
        # already, ref:
        # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
789 790
        # if not self._tdeSubProcess:
        #     raise RuntimeError("sub process object missing")
791

792 793 794
        # self._status.set(Status.STATUS_STOPPING)
        # TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what
        # self._tdeSubProcess = None
795 796 797 798 799 800 801
        # if not self._tdeSubProcess.stop(): # everything withing
        #     if self._tdeSubProcess.isRunning():  # still running, should now never happen
        #         Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
        #             self._tdeSubProcess.getPid()))
        #     else:
        #         self._tdeSubProcess = None  # not running any more
        self.join()  # stop the thread, change the status, etc.
802 803

        # Check if it's really stopped
804 805
        outputLines = 10 # for last output
        if  self.getStatus().isStopped():
806
            self.procIpcBatch(outputLines)  # one last time
807
            Logging.debug("End of TDengine Service Output")
808
            Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
809
        else:
810
            print("WARNING: SMT did not terminate as expected")
811 812 813

    def join(self):
        # TODO: sanity check
814 815 816 817 818 819 820 821 822 823 824 825
        s = self.getStatus()
        if s.isStopping() or s.isStopped(): # we may be stopping ourselves, or have been stopped/killed by others
            if self._thread or self._thread2 :
                if self._thread:
                    self._thread.join()
                    self._thread = None
                if self._thread2: # STD ERR thread            
                    self._thread2.join()
                    self._thread2 = None
            else:
                Logging.warning("Joining empty thread, doing nothing")
        else:
826
            raise RuntimeError(
827
                "SMT.Join(): Unexpected status: {}".format(self._status))
828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846

    def _trimQueue(self, targetSize):
        if targetSize <= 0:
            return  # do nothing
        q = self._ipcQueue
        if (q.qsize() <= targetSize):  # no need to trim
            return

        Logging.debug("Triming IPC queue to target size: {}".format(targetSize))
        itemsToTrim = q.qsize() - targetSize
        for i in range(0, itemsToTrim):
            try:
                q.get_nowait()
            except Empty:
                break  # break out of for loop, no more trimming

    TD_READY_MSG = "TDengine is initialized successfully"

    def procIpcBatch(self, trimToTarget=0, forceOutput=False):
847 848 849 850
        '''
        Process a batch of STDOUT/STDERR data, until we read EMPTY from
        the pipe.
        '''
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
        self._trimQueue(trimToTarget)  # trim if necessary
        # Process all the output generated by the underlying sub process,
        # managed by IO thread
        print("<", end="", flush=True)
        while True:
            try:
                line = self._ipcQueue.get_nowait()  # getting output at fast speed
                self._printProgress("_o")
            except Empty:
                # time.sleep(2.3) # wait only if there's no output
                # no more output
                print(".>", end="", flush=True)
                return  # we are done with THIS BATCH
            else:  # got line, printing out
                if forceOutput:
S
Steven Li 已提交
866
                    Logging.info('[TAOSD] ' + line)
867
                else:
S
Steven Li 已提交
868
                    Logging.debug('[TAOSD] ' + line)
869 870 871 872 873 874 875 876 877 878
        print(">", end="", flush=True)

    _ProgressBars = ["--", "//", "||", "\\\\"]

    def _printProgress(self, msg):  # TODO: assuming 2 chars
        print(msg, end="", flush=True)
        pBar = self._ProgressBars[Dice.throw(4)]
        print(pBar, end="", flush=True)
        print('\b\b\b\b', end="", flush=True)

879 880 881 882 883 884 885 886 887 888 889 890 891
    BinaryLine = NewType('BinaryLine', bytes) # line with binary data, directly from STDOUT, etc.
    TextLine   = NewType('TextLine', str) # properly decoded, suitable for printing, etc.
    x = TextLine('xyz')
    
    @classmethod
    def _decodeBinLine(cls, bLine: BinaryLine) -> Optional[TextLine] :
        try:
            tLine = bLine.decode("utf-8").rstrip()
            return cls.TextLine(tLine)
        except UnicodeError:
            print("\nNon-UTF8 server output: {}\n".format(bLine.decode('cp437')))
            return None

892 893 894 895 896 897 898 899 900 901 902
    def svcOutputReader(self, out: IO, queue, logDir: str):
        '''
        The infinite routine that processes the STDOUT stream for the sub process being managed.

        :param out: the IO stream object used to fetch the data from
        :param queue: the queue where we dump the roughly parsed line-by-line data
        :param logDir: where we should dump a verbatim output file
        '''
        os.makedirs(logDir, exist_ok=True)
        logFile = os.path.join(logDir,'stdout.log')
        fOut = open(logFile, 'wb')
903 904 905
        # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
        # print("This is the svcOutput Reader...")
        # for line in out :
906 907 908
        out.readline()
        for bLine in iter(out.readline, b''):
            fOut.write(bLine)
909 910
            # print("Finished reading a line: {}".format(line))
            # print("Adding item to queue...")
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936

            # Moved to above
            # try:
            #     line = line.decode("utf-8").rstrip()
            # except UnicodeError:
            #     print("\nNon-UTF8 server output: {}\n".format(line))
            tLine = self._decodeBinLine(bLine)

            if tLine is not None:
                # This might block, and then causing "out" buffer to block
                queue.put(tLine)
                self._printProgress("_i")

                if self._status.isStarting():  # we are starting, let's see if we have started
                    if tLine.find(self.TD_READY_MSG) != -1:  # found
                        Logging.info("Waiting for the service to become FULLY READY")
                        time.sleep(1.0) # wait for the server to truly start. TODO: remove this
                        Logging.info("Service is now FULLY READY") # TODO: more ID info here?
                        self._status.set(Status.STATUS_RUNNING)

                # Trim the queue if necessary: TODO: try this 1 out of 10 times
                self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10)  # trim to 90% size

                if self._status.isStopping():  # TODO: use thread status instead
                    # WAITING for stopping sub process to finish its outptu
                    print("_w", end="", flush=True)
937 938 939

            # queue.put(line)
        # meaning sub process must have died
940 941
        Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
        self.setStatus(Status.STATUS_STOPPED)
942 943
        out.close() # Close the stream
        fOut.close() # Close the output file
944

945 946 947 948
    def svcErrorReader(self, err: IO, queue, logDir: str):
        os.makedirs(logDir, exist_ok=True)
        logFile = os.path.join(logDir,'stderr.log')
        fErr = open(logFile, 'wb')
949
        for line in iter(err.readline, b''):
950
            fErr.write(line)
S
Steven Li 已提交
951
            Logging.info("TDengine STDERR: {}".format(line))
952
        Logging.info("EOF for TDengine STDERR")
953 954
        err.close()
        fErr.close()