提交 c6a5706f 编写于 作者: S Steven Li

Multi-instance code working for single instance case, ready to refactor...

Multi-instance code working for single instance case, ready to refactor crash_gen tool into multiple files
上级 fb9376be
...@@ -1780,8 +1780,8 @@ class Task(): ...@@ -1780,8 +1780,8 @@ class Task():
return True return True
elif msg.find("duplicated column names") != -1: # also alter table tag issues elif msg.find("duplicated column names") != -1: # also alter table tag issues
return True return True
elif (gSvcMgr!=None) and gSvcMgr.isRestarting(): elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ...
logger.info("Ignoring error when service is restarting: errno = {}, msg = {}".format(errno, msg)) logger.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
return True return True
return False # Not an acceptable error return False # Not an acceptable error
...@@ -2451,8 +2451,9 @@ class MyLoggingAdapter(logging.LoggerAdapter): ...@@ -2451,8 +2451,9 @@ class MyLoggingAdapter(logging.LoggerAdapter):
class ServiceManager: class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
def __init__(self): def __init__(self, numDnodes = 1):
print("Starting TDengine Service Manager") logger.info("TDengine Service Manager (TSM) created")
self._numDnodes = numDnodes # >1 means we have a cluster
# signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
# signal.signal(signal.SIGINT, self.sigIntHandler) # signal.signal(signal.SIGINT, self.sigIntHandler)
# signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
...@@ -2460,9 +2461,12 @@ class ServiceManager: ...@@ -2460,9 +2461,12 @@ class ServiceManager:
self.inSigHandler = False self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside # self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService() # _startTaosService()
self.svcMgrThread = None # type: ServiceManagerThread self.svcMgrThreads = [] # type: List[ServiceManagerThread]
for i in range(0, numDnodes):
self.svcMgrThreads.append(ServiceManagerThread(i))
self._lock = threading.Lock() self._lock = threading.Lock()
self._isRestarting = False # self._isRestarting = False
def _doMenu(self): def _doMenu(self):
choice = "" choice = ""
...@@ -2494,7 +2498,7 @@ class ServiceManager: ...@@ -2494,7 +2498,7 @@ class ServiceManager:
if choice == "1": if choice == "1":
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
elif choice == "2": elif choice == "2":
self.stopTaosService() self.stopTaosServices()
elif choice == "3": # Restart elif choice == "3": # Restart
self.restart() self.restart()
else: else:
...@@ -2509,33 +2513,70 @@ class ServiceManager: ...@@ -2509,33 +2513,70 @@ class ServiceManager:
return return
self.inSigHandler = True self.inSigHandler = True
self.stopTaosService() self.stopTaosServices()
print("ServiceManager: INT Signal Handler returning...") print("ServiceManager: INT Signal Handler returning...")
self.inSigHandler = False self.inSigHandler = False
def sigHandlerResume(self): def sigHandlerResume(self):
print("Resuming TDengine service manager (main thread)...\n\n") print("Resuming TDengine service manager (main thread)...\n\n")
def _updateThreadStatus(self): # def _updateThreadStatus(self):
if self.svcMgrThread: # valid svc mgr thread # if self.svcMgrThread: # valid svc mgr thread
if self.svcMgrThread.isStopped(): # done? # if self.svcMgrThread.isStopped(): # done?
self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate? # self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
self.svcMgrThread = None # no more # self.svcMgrThread = None # no more
def isActive(self):
"""
Determine if the service/cluster is active at all, i.e. at least
one thread is not "stopped".
"""
for thread in self.svcMgrThreads:
if not thread.isStopped():
return True
return False
# def isRestarting(self):
# """
# Determine if the service/cluster is being "restarted", i.e., at least
# one thread is in "restarting" status
# """
# for thread in self.svcMgrThreads:
# if thread.isRestarting():
# return True
# return False
def isStable(self):
"""
Determine if the service/cluster is "stable", i.e. all of the
threads are in "stable" status.
"""
for thread in self.svcMgrThreads:
if not thread.isStable():
return False
return True
def _procIpcAll(self): def _procIpcAll(self):
while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here while self.isActive():
if self.isRunning(): for thread in self.svcMgrThreads: # all thread objects should always be valid
self.svcMgrThread.procIpcBatch() # regular processing, # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
self._updateThreadStatus() if thread.isRunning():
elif self.isRetarting(): thread.procIpcBatch() # regular processing,
if thread.isStopped():
thread.procIpcBatch() # one last time?
# self._updateThreadStatus()
elif thread.isRetarting():
print("Service restarting...") print("Service restarting...")
# else this thread is stopped
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
# raise CrashGenError("dummy")
print("Service Manager Thread (with subprocess) ended, main thread exiting...") print("Service Manager Thread (with subprocess) ended, main thread exiting...")
def startTaosService(self): def startTaosServices(self):
with self._lock: with self._lock:
if self.svcMgrThread: if self.isActive():
raise RuntimeError("Cannot start TAOS service when one may already be running") raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running")
# Find if there's already a taosd service, and then kill it # Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter(): for proc in psutil.process_iter():
...@@ -2545,53 +2586,45 @@ class ServiceManager: ...@@ -2545,53 +2586,45 @@ class ServiceManager:
proc.kill() proc.kill()
# print("Process: {}".format(proc.name())) # print("Process: {}".format(proc.name()))
self.svcMgrThread = ServiceManagerThread() # create the object # self.svcMgrThread = ServiceManagerThread() # create the object
print("Attempting to start TAOS service started, printing out output...") for thread in self.svcMgrThreads:
self.svcMgrThread.start() thread.start()
self.svcMgrThread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines thread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
print("TAOS service started")
def stopTaosService(self, outputLines=20): def stopTaosServices(self):
with self._lock: with self._lock:
if not self.isRunning(): if not self.isActive():
logger.warning("Cannot stop TAOS service, not running") logger.warning("Cannot stop TAOS service(s), already not active")
return return
print("Terminating Service Manager Thread (SMT) execution...") for thread in self.svcMgrThreads:
self.svcMgrThread.stop() thread.stop()
if self.svcMgrThread.isStopped():
self.svcMgrThread.procIpcBatch(outputLines) # one last time
self.svcMgrThread = None
print("End of TDengine Service Output")
print("----- TDengine Service (managed by SMT) is now terminated -----\n")
else:
print("WARNING: SMT did not terminate as expected")
def run(self): def run(self):
self.startTaosService() self.startTaosServices()
self._procIpcAll() # pump/process all the messages, may encounter SIG + restart self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
if self.isRunning(): # if sig handler hasn't destroyed it by now if self.isActive(): # if sig handler hasn't destroyed it by now
self.stopTaosService() # should have started already self.stopTaosServices() # should have started already
def restart(self): def restart(self):
if self._isRestarting: if not self.isStable():
logger.warning("Cannot restart service when it's already restarting") logger.warning("Cannot restart service/cluster, when not stable")
return return
self._isRestarting = True # self._isRestarting = True
if self.isRunning(): if self.isActive():
self.stopTaosService() self.stopTaosServices()
else: else:
logger.warning("Service not running when restart requested") logger.warning("Service not active when restart requested")
self.startTaosService() self.startTaosService()
self._isRestarting = False # self._isRestarting = False
def isRunning(self): # def isRunning(self):
return self.svcMgrThread != None # return self.svcMgrThread != None
def isRestarting(self): # def isRestarting(self):
return self._isRestarting # return self._isRestarting
class ServiceManagerThread: class ServiceManagerThread:
""" """
...@@ -2602,15 +2635,26 @@ class ServiceManagerThread: ...@@ -2602,15 +2635,26 @@ class ServiceManagerThread:
""" """
MAX_QUEUE_SIZE = 10000 MAX_QUEUE_SIZE = 10000
def __init__(self, tInst : TdeInstance = None): def __init__(self, tInstNum = 0, tInst : TdeInstance = None):
# Set the sub process
self._tdeSubProcess = None # type: TdeSubProcess self._tdeSubProcess = None # type: TdeSubProcess
# Arrange the TDengine instance
self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
self._tInst = tInst or TdeInstance() # Need an instance self._tInst = tInst or TdeInstance() # Need an instance
self._thread = None
self._status = None self._thread = None # The actual thread, # type: threading.Thread
self._status = MainExec.STATUS_STOPPED # The status of the underlying service, actually.
def __repr__(self):
return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum)
def getStatus(self): def getStatus(self):
return self._status return self._status
def isStarting(self):
return self._status == MainExec.STATUS_STARTING
def isRunning(self): def isRunning(self):
# return self._thread and self._thread.is_alive() # return self._thread and self._thread.is_alive()
return self._status == MainExec.STATUS_RUNNING return self._status == MainExec.STATUS_RUNNING
...@@ -2621,6 +2665,9 @@ class ServiceManagerThread: ...@@ -2621,6 +2665,9 @@ class ServiceManagerThread:
def isStopped(self): def isStopped(self):
return self._status == MainExec.STATUS_STOPPED return self._status == MainExec.STATUS_STOPPED
def isStable(self):
return self.isRunning() or self.isStopped()
# Start the thread (with sub process), and wait for the sub service # Start the thread (with sub process), and wait for the sub service
# to become fully operational # to become fully operational
def start(self): def start(self):
...@@ -2629,8 +2676,9 @@ class ServiceManagerThread: ...@@ -2629,8 +2676,9 @@ class ServiceManagerThread:
if self._tdeSubProcess: if self._tdeSubProcess:
raise RuntimeError("TDengine sub process already created/running") raise RuntimeError("TDengine sub process already created/running")
self._status = MainExec.STATUS_STARTING logger.info("Attempting to start TAOS service: {}".format(self))
self._status = MainExec.STATUS_STARTING
self._tdeSubProcess = TdeSubProcess(self._tInst) self._tdeSubProcess = TdeSubProcess(self._tInst)
self._tdeSubProcess.start() self._tdeSubProcess.start()
...@@ -2654,10 +2702,11 @@ class ServiceManagerThread: ...@@ -2654,10 +2702,11 @@ class ServiceManagerThread:
print("_zz_", end="", flush=True) print("_zz_", end="", flush=True)
if self._status == MainExec.STATUS_RUNNING: if self._status == MainExec.STATUS_RUNNING:
logger.info("[] TDengine service READY to process requests") logger.info("[] TDengine service READY to process requests")
logger.info("[] TAOS service started: {}".format(self))
return # now we've started return # now we've started
# TODO: handle this better? # TODO: handle failure-to-start better?
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully") raise RuntimeError("TDengine service did not start successfully: {}".format(self))
def stop(self): def stop(self):
# can be called from both main thread or signal handler # can be called from both main thread or signal handler
...@@ -2687,6 +2736,15 @@ class ServiceManagerThread: ...@@ -2687,6 +2736,15 @@ class ServiceManagerThread:
self._tdeSubProcess = None # not running any more self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc. self.join() # stop the thread, change the status, etc.
# Check if it's really stopped
outputLines = 20 # for last output
if self.isStopped():
self.procIpcBatch(outputLines) # one last time
print("End of TDengine Service Output: {}".format(self))
print("----- TDengine Service (managed by SMT) is now terminated -----\n")
else:
print("WARNING: SMT did not terminate as expected: {}".format(self))
def join(self): def join(self):
# TODO: sanity check # TODO: sanity check
if not self.isStopping(): if not self.isStopping():
...@@ -2770,7 +2828,7 @@ class ServiceManagerThread: ...@@ -2770,7 +2828,7 @@ class ServiceManagerThread:
if line.find(self.TD_READY_MSG) != -1: # found if line.find(self.TD_READY_MSG) != -1: # found
logger.info("Waiting for the service to become FULLY READY") logger.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this time.sleep(1.0) # wait for the server to truly start. TODO: remove this
logger.info("Service is now FULLY READY") logger.info("Service instance #{} is now FULLY READY".format(self._tInstNum))
self._status = MainExec.STATUS_RUNNING self._status = MainExec.STATUS_RUNNING
# Trim the queue if necessary: TODO: try this 1 out of 10 times # Trim the queue if necessary: TODO: try this 1 out of 10 times
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册