提交 943bbe22 编写于 作者: S Steven Li

Enhanced crash_gen tool to report useful info when stuck

上级 3b419a1c
...@@ -42,6 +42,13 @@ import os ...@@ -42,6 +42,13 @@ import os
import io import io
import signal import signal
import traceback import traceback
try:
import psutil
except:
print("Psutil module needed, please install: sudo pip3 install psutil")
sys.exit(-1)
# Require Python 3 # Require Python 3
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
raise Exception("Must be using Python 3") raise Exception("Must be using Python 3")
...@@ -69,8 +76,7 @@ class CrashGenError(Exception): ...@@ -69,8 +76,7 @@ class CrashGenError(Exception):
class WorkerThread: class WorkerThread:
def __init__(self, pool: ThreadPool, tid, def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
tc: ThreadCoordinator,
# te: TaskExecutor, # te: TaskExecutor,
): # note: main thread context! ): # note: main thread context!
# self._curStep = -1 # self._curStep = -1
...@@ -138,7 +144,12 @@ class WorkerThread: ...@@ -138,7 +144,12 @@ class WorkerThread:
# tc = ThreadCoordinator(None) # tc = ThreadCoordinator(None)
while True: while True:
tc = self._tc # Thread Coordinator, the overall master tc = self._tc # Thread Coordinator, the overall master
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one try:
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
except threading.BrokenBarrierError as err: # main thread timed out
logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
break
logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
self.crossStepGate() # then per-thread gate, after being tapped self.crossStepGate() # then per-thread gate, after being tapped
logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
...@@ -248,14 +259,14 @@ class ThreadCoordinator: ...@@ -248,14 +259,14 @@ class ThreadCoordinator:
def getDbManager(self) -> DbManager: def getDbManager(self) -> DbManager:
return self._dbManager return self._dbManager
def crossStepBarrier(self): def crossStepBarrier(self, timeout=None):
self._stepBarrier.wait() self._stepBarrier.wait(timeout)
def requestToStop(self): def requestToStop(self):
self._runStatus = MainExec.STATUS_STOPPING self._runStatus = MainExec.STATUS_STOPPING
self._execStats.registerFailure("User Interruption") self._execStats.registerFailure("User Interruption")
def _runShouldEnd(self, transitionFailed, hasAbortedTask): def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
maxSteps = gConfig.max_steps # type: ignore maxSteps = gConfig.max_steps # type: ignore
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
return True return True
...@@ -265,6 +276,8 @@ class ThreadCoordinator: ...@@ -265,6 +276,8 @@ class ThreadCoordinator:
return True return True
if hasAbortedTask: if hasAbortedTask:
return True return True
if workerTimeout:
return True
return False return False
def _hasAbortedTask(self): # from execution of previous step def _hasAbortedTask(self): # from execution of previous step
...@@ -296,7 +309,7 @@ class ThreadCoordinator: ...@@ -296,7 +309,7 @@ class ThreadCoordinator:
# let other threads go past the pool barrier, but wait at the # let other threads go past the pool barrier, but wait at the
# thread gate # thread gate
logger.debug("[TRD] Main thread about to cross the barrier") logger.debug("[TRD] Main thread about to cross the barrier")
self.crossStepBarrier() self.crossStepBarrier(timeout=15)
self._stepBarrier.reset() # Other worker threads should now be at the "gate" self._stepBarrier.reset() # Other worker threads should now be at the "gate"
logger.debug("[TRD] Main thread finished crossing the barrier") logger.debug("[TRD] Main thread finished crossing the barrier")
...@@ -342,11 +355,21 @@ class ThreadCoordinator: ...@@ -342,11 +355,21 @@ class ThreadCoordinator:
self._execStats.startExec() # start the stop watch self._execStats.startExec() # start the stop watch
transitionFailed = False transitionFailed = False
hasAbortedTask = False hasAbortedTask = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask): workerTimeout = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
if not gConfig.debug: # print this only if we are not in debug mode if not gConfig.debug: # print this only if we are not in debug mode
print(".", end="", flush=True) print(".", end="", flush=True)
self._syncAtBarrier() # For now just cross the barrier try:
self._syncAtBarrier() # For now just cross the barrier
except threading.BrokenBarrierError as err:
logger.info("Main loop aborted, caused by worker thread time-out")
self._execStats.registerFailure("Aborted due to worker thread timeout")
print("\n\nWorker Thread time-out detected, important thread info:")
ts = ThreadStacks()
ts.print(filterInternal=True)
workerTimeout = True
break
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
# We use this period to do house keeping work, when all worker # We use this period to do house keeping work, when all worker
...@@ -364,6 +387,8 @@ class ThreadCoordinator: ...@@ -364,6 +387,8 @@ class ThreadCoordinator:
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
logger.debug("Abnormal ending of main thraed") logger.debug("Abnormal ending of main thraed")
elif workerTimeout:
logger.debug("Abnormal ending of main thread, due to worker timeout")
else: # regular ending, workers waiting at "barrier" else: # regular ending, workers waiting at "barrier"
logger.debug("Regular ending, main thread waiting for all worker threads to stop...") logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
self._syncAtBarrier() self._syncAtBarrier()
...@@ -569,9 +594,7 @@ class DbConn: ...@@ -569,9 +594,7 @@ class DbConn:
# below implemented by child classes # below implemented by child classes
self.openByType() self.openByType()
logger.debug( logger.debug("[DB] data connection opened, type = {}".format(self._type))
"[DB] data connection opened, type = {}".format(
self._type))
self.isOpen = True self.isOpen = True
def resetDb(self): # reset the whole database, etc. def resetDb(self): # reset the whole database, etc.
...@@ -786,9 +809,7 @@ class DbConnNative(DbConn): ...@@ -786,9 +809,7 @@ class DbConnNative(DbConn):
self.__class__._connInfoDisplayed = True # updating CLASS variable self.__class__._connInfoDisplayed = True # updating CLASS variable
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
self._conn = taos.connect( self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
host=hostAddr,
config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor() self._cursor = self._conn.cursor()
self._cursor.execute('reset query cache') self._cursor.execute('reset query cache')
...@@ -1674,7 +1695,7 @@ class StateTransitionTask(Task): ...@@ -1674,7 +1695,7 @@ class StateTransitionTask(Task):
@classmethod @classmethod
def getRegTableName(cls, i): def getRegTableName(cls, i):
return "db.reg_table_{}".format(i) return "reg_table_{}".format(i)
def execute(self, wt: WorkerThread): def execute(self, wt: WorkerThread):
super().execute(wt) super().execute(wt)
...@@ -1984,9 +2005,9 @@ class MyLoggingAdapter(logging.LoggerAdapter): ...@@ -1984,9 +2005,9 @@ class MyLoggingAdapter(logging.LoggerAdapter):
class SvcManager: class SvcManager:
def __init__(self): def __init__(self):
print("Starting TDengine Service Manager") print("Starting TDengine Service Manager")
signal.signal(signal.SIGTERM, self.sigIntHandler) # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
signal.signal(signal.SIGINT, self.sigIntHandler) # signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
self.inSigHandler = False self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside # self._status = MainExec.STATUS_RUNNING # set inside
...@@ -2035,14 +2056,14 @@ class SvcManager: ...@@ -2035,14 +2056,14 @@ class SvcManager:
self.inSigHandler = False self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame): def sigIntHandler(self, signalNumber, frame):
print("Sig INT Handler starting...") print("SvcManager: INT Signal Handler starting...")
if self.inSigHandler: if self.inSigHandler:
print("Ignoring repeated SIG_INT...") print("Ignoring repeated SIG_INT...")
return return
self.inSigHandler = True self.inSigHandler = True
self.stopTaosService() self.stopTaosService()
print("INT signal handler returning...") print("SvcManager: INT Signal Handler returning...")
self.inSigHandler = False self.inSigHandler = False
def sigHandlerResume(self): def sigHandlerResume(self):
...@@ -2064,8 +2085,16 @@ class SvcManager: ...@@ -2064,8 +2085,16 @@ class SvcManager:
def startTaosService(self): def startTaosService(self):
if self.svcMgrThread: if self.svcMgrThread:
raise RuntimeError( raise RuntimeError("Cannot start TAOS service when one may already be running")
"Cannot start TAOS service when one may already be running")
# Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter():
if proc.name() == 'taosd':
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
time.sleep(2.0)
proc.kill()
# print("Process: {}".format(proc.name()))
self.svcMgrThread = ServiceManagerThread() # create the object self.svcMgrThread = ServiceManagerThread() # create the object
self.svcMgrThread.start() self.svcMgrThread.start()
print("TAOS service started, printing out output...") print("TAOS service started, printing out output...")
...@@ -2075,9 +2104,11 @@ class SvcManager: ...@@ -2075,9 +2104,11 @@ class SvcManager:
print("TAOS service started") print("TAOS service started")
def stopTaosService(self, outputLines=20): def stopTaosService(self, outputLines=20):
if not self.isRunning():
logger.warning("Cannot stop TAOS service, not running")
return
print("Terminating Service Manager Thread (SMT) execution...") print("Terminating Service Manager Thread (SMT) execution...")
if not self.svcMgrThread:
raise RuntimeError("Unexpected empty svc mgr thread")
self.svcMgrThread.stop() self.svcMgrThread.stop()
if self.svcMgrThread.isStopped(): if self.svcMgrThread.isStopped():
self.svcMgrThread.procIpcBatch(outputLines) # one last time self.svcMgrThread.procIpcBatch(outputLines) # one last time
...@@ -2090,9 +2121,11 @@ class SvcManager: ...@@ -2090,9 +2121,11 @@ class SvcManager:
def run(self): def run(self):
self.startTaosService() self.startTaosService()
self._procIpcAll() # pump/process all the messages self._procIpcAll() # pump/process all the messages
if self.svcMgrThread: # if sig handler hasn't destroyed it by now if self.isRunning(): # if sig handler hasn't destroyed it by now
self.stopTaosService() # should have started already self.stopTaosService() # should have started already
def isRunning(self):
return self.svcMgrThread != None
class ServiceManagerThread: class ServiceManagerThread:
MAX_QUEUE_SIZE = 10000 MAX_QUEUE_SIZE = 10000
...@@ -2144,6 +2177,7 @@ class ServiceManagerThread: ...@@ -2144,6 +2177,7 @@ class ServiceManagerThread:
logger.info("[] TDengine service READY to process requests") logger.info("[] TDengine service READY to process requests")
return # now we've started return # now we've started
# TODO: handle this better? # TODO: handle this better?
self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully") raise RuntimeError("TDengine service did not start successfully")
def stop(self): def stop(self):
...@@ -2292,6 +2326,15 @@ class TdeSubProcess: ...@@ -2292,6 +2326,15 @@ class TdeSubProcess:
taosdPath = self.getBuildPath() + "/build/bin/taosd" taosdPath = self.getBuildPath() + "/build/bin/taosd"
cfgPath = self.getBuildPath() + "/test/cfg" cfgPath = self.getBuildPath() + "/test/cfg"
# Delete the log files
logPath = self.getBuildPath() + "/test/log"
# ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ]
for f in filelist:
filePath = os.path.join(logPath, f)
print("Removing log file: {}".format(filePath))
os.remove(filePath)
svcCmd = [taosdPath, '-c', cfgPath] svcCmd = [taosdPath, '-c', cfgPath]
# svcCmd = ['vmstat', '1'] # svcCmd = ['vmstat', '1']
if self.subProcess: # already there if self.subProcess: # already there
...@@ -2325,16 +2368,46 @@ class TdeSubProcess: ...@@ -2325,16 +2368,46 @@ class TdeSubProcess:
print("TDengine service process terminated successfully from SIG_INT") print("TDengine service process terminated successfully from SIG_INT")
self.subProcess = None self.subProcess = None
class ThreadStacks: # stack info for all threads
def __init__(self):
self._allStacks = {}
allFrames = sys._current_frames()
for th in threading.enumerate():
stack = traceback.extract_stack(allFrames[th.ident])
self._allStacks[th.native_id] = stack
def print(self, filteredEndName = None, filterInternal = False):
for thNid, stack in self._allStacks.items(): # for each thread
lastFrame = stack[-1]
if filteredEndName: # we need to filter out stacks that match this name
if lastFrame.name == filteredEndName : # end did not match
continue
if filterInternal:
if lastFrame.name in ['wait', 'invoke_excepthook',
'_wait', # The Barrier exception
'svcOutputReader', # the svcMgr thread
'__init__']: # the thread that extracted the stack
continue # ignore
# Now print
print("\n<----- Thread Info for ID: {}".format(thNid))
for frame in stack:
# print(frame)
print("File {filename}, line {lineno}, in {name}".format(
filename=frame.filename, lineno=frame.lineno, name=frame.name))
print(" {}".format(frame.line))
print("-----> End of Thread Info\n")
class ClientManager: class ClientManager:
def __init__(self): def __init__(self):
print("Starting service manager") print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler) # signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler) # signal.signal(signal.SIGINT, self.sigIntHandler)
self._status = MainExec.STATUS_RUNNING self._status = MainExec.STATUS_RUNNING
self.tc = None self.tc = None
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame): def sigIntHandler(self, signalNumber, frame):
if self._status != MainExec.STATUS_RUNNING: if self._status != MainExec.STATUS_RUNNING:
print("Repeated SIGINT received, forced exit...") print("Repeated SIGINT received, forced exit...")
...@@ -2342,9 +2415,50 @@ class ClientManager: ...@@ -2342,9 +2415,50 @@ class ClientManager:
sys.exit(-1) sys.exit(-1)
self._status = MainExec.STATUS_STOPPING # immediately set our status self._status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...") print("ClientManager: Terminating program...")
self.tc.requestToStop() self.tc.requestToStop()
def _doMenu(self):
choice = ""
while True:
print("\nInterrupting Client Program, Choose an Action: ")
print("1: Resume")
print("2: Terminate")
print("3: Show Threads")
# Remember to update the if range below
# print("Enter Choice: ", end="", flush=True)
while choice == "":
choice = input("Enter Choice: ")
if choice != "":
break # done with reading repeated input
if choice in ["1", "2", "3"]:
break # we are done with whole method
print("Invalid choice, please try again.")
choice = "" # reset
return choice
def sigUsrHandler(self, signalNumber, frame):
print("Interrupting main thread execution upon SIGUSR1")
if self.inSigHandler: # already
print("Ignoring repeated SIG_USR1...")
return # do nothing if it's already not running
self.inSigHandler = True
choice = self._doMenu()
if choice == "1":
print("Resuming execution...")
time.sleep(1.0)
elif choice == "2":
print("Not implemented yet")
time.sleep(1.0)
elif choice == "3":
ts = ThreadStacks()
ts.print()
else:
raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def _printLastNumbers(self): # to verify data durability def _printLastNumbers(self): # to verify data durability
dbManager = DbManager(resetDb=False) dbManager = DbManager(resetDb=False)
dbc = dbManager.getDbConn() dbc = dbManager.getDbConn()
...@@ -2377,11 +2491,7 @@ class ClientManager: ...@@ -2377,11 +2491,7 @@ class ClientManager:
def prepare(self): def prepare(self):
self._printLastNumbers() self._printLastNumbers()
def run(self): def run(self, svcMgr):
if gConfig.auto_start_service:
svcMgr = SvcManager()
svcMgr.startTaosService()
self._printLastNumbers() self._printLastNumbers()
dbManager = DbManager() # Regular function dbManager = DbManager() # Regular function
...@@ -2391,7 +2501,7 @@ class ClientManager: ...@@ -2391,7 +2501,7 @@ class ClientManager:
self.tc.run() self.tc.run()
# print("exec stats: {}".format(self.tc.getExecStats())) # print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed())) # print("TC failed = {}".format(self.tc.isFailed()))
if gConfig.auto_start_service: if svcMgr: # gConfig.auto_start_service:
svcMgr.stopTaosService() svcMgr.stopTaosService()
# Print exec status, etc., AFTER showing messages from the server # Print exec status, etc., AFTER showing messages from the server
self.conclude() self.conclude()
...@@ -2410,18 +2520,39 @@ class MainExec: ...@@ -2410,18 +2520,39 @@ class MainExec:
STATUS_STOPPING = 3 STATUS_STOPPING = 3
STATUS_STOPPED = 4 STATUS_STOPPED = 4
@classmethod def __init__(self):
def runClient(cls): self._clientMgr = None
clientManager = ClientManager() self._svcMgr = None
return clientManager.run()
@classmethod signal.signal(signal.SIGTERM, self.sigIntHandler)
def runService(cls): signal.signal(signal.SIGINT, self.sigIntHandler)
svcManager = SvcManager() signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
svcManager.run()
@classmethod def sigUsrHandler(self, signalNumber, frame):
def runTemp(cls): # for debugging purposes if self._clientMgr:
self._clientMgr.sigUsrHandler(signalNumber, frame)
elif self._svcMgr: # Only if no client mgr, we are running alone
self._svcMgr.sigUsrHandler(signalNumber, frame)
def sigIntHandler(self, signalNumber, frame):
if self._svcMgr:
self._svcMgr.sigIntHandler(signalNumber, frame)
if self._clientMgr:
self._clientMgr.sigIntHandler(signalNumber, frame)
def runClient(self):
if gConfig.auto_start_service:
self._svcMgr = SvcManager()
self._svcMgr.startTaosService() # we start, don't run
self._clientMgr = ClientManager()
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
def runService(self):
self._svcMgr = SvcManager()
self._svcMgr.run() # run to some end state
def runTemp(self): # for debugging purposes
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
# dbc = dbState.getDbConn() # dbc = dbState.getDbConn()
# sTbName = dbState.getFixedSuperTableName() # sTbName = dbState.getFixedSuperTableName()
...@@ -2577,10 +2708,11 @@ def main(): ...@@ -2577,10 +2708,11 @@ def main():
Dice.seed(0) # initial seeding of dice Dice.seed(0) # initial seeding of dice
# Run server or client # Run server or client
mExec = MainExec()
if gConfig.run_tdengine: # run server if gConfig.run_tdengine: # run server
MainExec.runService() mExec.runService()
else: else:
return MainExec.runClient() return mExec.runClient()
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册