未验证 提交 841d7792 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2607 from taosdata/feature/crash_gen

Feature/crash gen
...@@ -1227,13 +1227,13 @@ class Task(): ...@@ -1227,13 +1227,13 @@ class Task():
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql) errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)
self.logDebug(errMsg) self.logDebug(errMsg)
if gConfig.debug : if gConfig.debug :
raise # so that we see full stack # raise # so that we see full stack
else: # non-debug traceback.print_exc()
print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) + print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
"----------------------------\n") "----------------------------\n")
# sys.exit(-1) # sys.exit(-1)
self._err = err self._err = err
self._aborted = True self._aborted = True
except Exception as e : except Exception as e :
self.logInfo("Non-TAOS exception encountered") self.logInfo("Non-TAOS exception encountered")
self._err = e self._err = e
...@@ -1591,31 +1591,6 @@ class Dice(): ...@@ -1591,31 +1591,6 @@ class Dice():
raise RuntimeError("Cannot throw dice before seeding it") raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop) return random.randrange(start, stop)
# Anyone needing to carry out work should simply come here
# class WorkDispatcher():
# def __init__(self, dbState):
# # self.totalNumMethods = 2
# self.tasks = [
# # CreateTableTask(dbState), # Obsolete
# # DropTableTask(dbState),
# # AddDataTask(dbState),
# ]
# def throwDice(self):
# max = len(self.tasks) - 1
# dRes = random.randint(0, max)
# # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
# return dRes
# def pickTask(self):
# dice = self.throwDice()
# return self.tasks[dice]
# def doWork(self, workerThread):
# task = self.pickTask()
# task.execute(workerThread)
class LoggingFilter(logging.Filter): class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord): def filter(self, record: logging.LogRecord):
if ( record.levelno >= logging.INFO ) : if ( record.levelno >= logging.INFO ) :
...@@ -1633,46 +1608,15 @@ class MyLoggingAdapter(logging.LoggerAdapter): ...@@ -1633,46 +1608,15 @@ class MyLoggingAdapter(logging.LoggerAdapter):
# return '[%s] %s' % (self.extra['connid'], msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager: class SvcManager:
MAX_QUEUE_SIZE = 10000
def __init__(self): def __init__(self):
print("Starting service manager") print("Starting TDengine Service Manager")
signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler) signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
self.ioThread = None
self.subProcess = None self.inSigHandler = False
self.shouldStop = False # self._status = MainExec.STATUS_RUNNING # set inside _startTaosService()
# self.status = MainExec.STATUS_RUNNING # set inside _startTaosService() self.svcMgrThread = None
def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
line = line.decode("utf-8").rstrip()
queue.put(line) # This might block, and then causing "out" buffer to block
print("_i", end="", flush=True)
# Trim the queue if necessary
oneTenthQSize = self.MAX_QUEUE_SIZE // 10
if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize) ) : # 90% full?
print("Triming IPC queue by: {}".format(oneTenthQSize))
for i in range(0, oneTenthQSize) :
try:
queue.get_nowait()
except Empty:
break # break out of for loop, no more trimming
if self.shouldStop :
print("Stopping to read output from sub process")
break
# queue.put(line)
print("\nNo more output (most likely) from IO thread managing TDengine service") # meaning sub process must have died
out.close()
def _doMenu(self): def _doMenu(self):
choice = "" choice = ""
...@@ -1695,10 +1639,10 @@ class SvcManager: ...@@ -1695,10 +1639,10 @@ class SvcManager:
def sigUsrHandler(self, signalNumber, frame) : def sigUsrHandler(self, signalNumber, frame) :
print("Interrupting main thread execution upon SIGUSR1") print("Interrupting main thread execution upon SIGUSR1")
if self.status != MainExec.STATUS_RUNNING : if self.inSigHandler : # already
print("Ignoring repeated SIG...") print("Ignoring repeated SIG...")
return # do nothing if it's already not running return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING self.inSigHandler = True
choice = self._doMenu() choice = self._doMenu()
if choice == "1" : if choice == "1" :
...@@ -1711,67 +1655,227 @@ class SvcManager: ...@@ -1711,67 +1655,227 @@ class SvcManager:
else: else:
raise RuntimeError("Invalid menu choice: {}".format(choice)) raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame): def sigIntHandler(self, signalNumber, frame):
print("Sig INT Handler starting...") print("Sig INT Handler starting...")
if self.status != MainExec.STATUS_RUNNING : if self.inSigHandler :
print("Ignoring repeated SIG_INT...") print("Ignoring repeated SIG_INT...")
return return
self.inSigHandler = True
self.status = MainExec.STATUS_STOPPING # immediately set our status
self.stopTaosService() self.stopTaosService()
print("INT signal handler returning...") print("INT signal handler returning...")
self.inSigHandler = False
def sigHandlerResume(self) : def sigHandlerResume(self) :
print("Resuming TDengine service manager thread (main thread)...\n\n") print("Resuming TDengine service manager thread (main thread)...\n\n")
self.status = MainExec.STATUS_RUNNING
def _checkServiceManagerThread(self):
if self.svcMgrThread: # valid svc mgr thread
if self.svcMgrThread.isStopped(): # done?
self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
self.svcMgrThread = None # no more
def _procIpcAll(self):
while self.svcMgrThread : # for as long as the svc mgr thread is still here
self.svcMgrThread.procIpcBatch() # regular processing,
time.sleep(0.5) # pause, before next round
self._checkServiceManagerThread()
print("Service Manager Thread (with subprocess) has ended, main thread now exiting...")
def startTaosService(self):
if self.svcMgrThread:
raise RuntimeError("Cannot start TAOS service when one may already be running")
self.svcMgrThread = ServiceManagerThread() # create the object
self.svcMgrThread.start()
print("TAOS service started, printing out output...")
self.svcMgrThread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
print("TAOS service started")
def stopTaosService(self, outputLines = 20):
print("Terminating Service Manager Thread (SMT) execution...")
if not self.svcMgrThread:
raise RuntimeError("Unexpected empty svc mgr thread")
self.svcMgrThread.stop()
if self.svcMgrThread.isStopped():
self.svcMgrThread.procIpcBatch(outputLines) # one last time
self.svcMgrThread = None
print("----- End of TDengine Service Output -----\n")
print("SMT execution terminated")
else:
print("WARNING: SMT did not terminate as expected")
def run(self):
self.startTaosService()
self._procIpcAll() # pump/process all the messages
if self.svcMgrThread: # if sig handler hasn't destroyed it by now
self.stopTaosService() # should have started already
class ServiceManagerThread:
MAX_QUEUE_SIZE = 10000
def __init__(self):
self._tdeSubProcess = None
self._thread = None
self._status = None
def getStatus(self):
return self._status
def isRunning(self):
# return self._thread and self._thread.is_alive()
return self._status == MainExec.STATUS_RUNNING
def isStopping(self):
return self._status == MainExec.STATUS_STOPPING
def isStopped(self):
return self._status == MainExec.STATUS_STOPPED
# Start the thread (with sub process), and wait for the sub service
# to become fully operational
def start(self):
if self._thread :
raise RuntimeError("Unexpected _thread")
if self._tdeSubProcess :
raise RuntimeError("TDengine sub process already created/running")
self._status = MainExec.STATUS_STARTING
self._tdeSubProcess = TdeSubProcess()
self._tdeSubProcess.start()
self._ipcQueue = Queue()
self._thread = threading.Thread(
target=self.svcOutputReader,
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
self._thread.daemon = True # thread dies with the program
self._thread.start()
# wait for service to start
for i in range(0, 10) :
time.sleep(1.0)
# self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True)
if self._status == MainExec.STATUS_RUNNING :
logger.info("[] TDengine service READY to process requests")
return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better?
def stop(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
if self.isStopped():
print("Service already stopped")
return
if self.isStopping():
print("Service is already being stopped")
return
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self._tdeSubProcess :
raise RuntimeError("sub process object missing")
def joinIoThread(self): self._status = MainExec.STATUS_STOPPING
if self.ioThread : self._tdeSubProcess.stop()
self.ioThread.join()
self.ioThread = None if self._tdeSubProcess.isRunning(): # still running
print("FAILED to stop sub process, it is still running... pid = {}".format(self.subProcess.pid))
else:
self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc.
def join(self):
# TODO: sanity check
if not self.isStopping():
raise RuntimeError("Unexpected status when ending svc mgr thread: {}".format(self._status))
if self._thread :
self._thread.join()
self._thread = None
self._status = MainExec.STATUS_STOPPED
else : else :
print("Joining empty thread, doing nothing") print("Joining empty thread, doing nothing")
def _trimQueue(self, targetSize):
if targetSize <= 0:
return # do nothing
q = self._ipcQueue
if (q.qsize() <= targetSize ) : # no need to trim
return
logger.debug("Triming IPC queue to target size: {}".format(targetSize))
itemsToTrim = q.qsize() - targetSize
for i in range(0, itemsToTrim) :
try:
q.get_nowait()
except Empty:
break # break out of for loop, no more trimming
TD_READY_MSG = "TDengine is initialized successfully" TD_READY_MSG = "TDengine is initialized successfully"
def _procIpcBatch(self): def procIpcBatch(self, trimToTarget = 0, forceOutput = False):
self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process, managed by IO thread # Process all the output generated by the underlying sub process, managed by IO thread
while True : print("<", end="", flush=True)
while True :
try: try:
line = self.ipcQueue.get_nowait() # getting output at fast speed line = self._ipcQueue.get_nowait() # getting output at fast speed
print("_o", end="", flush=True) self._printProgress("_o")
if self.status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1 : # found
self.status = MainExec.STATUS_RUNNING
except Empty: except Empty:
# time.sleep(2.3) # wait only if there's no output # time.sleep(2.3) # wait only if there's no output
# no more output # no more output
print(".>", end="", flush=True)
return # we are done with THIS BATCH return # we are done with THIS BATCH
else: # got line else: # got line, printing out
print(line) if forceOutput:
logger.info(line)
else:
logger.debug(line)
print(">", end="", flush=True)
_ProgressBars = ["--", "//", "||", "\\\\"]
def _printProgress(self, msg): # TODO: assuming 2 chars
print(msg, end="", flush=True)
pBar = self._ProgressBars[Dice.throw(4)]
print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True)
def _procIpcAll(self): def svcOutputReader(self, out: IO, queue):
while True : # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
print("<", end="", flush=True) # print("This is the svcOutput Reader...")
self._procIpcBatch() # process one batch # for line in out :
for line in iter(out.readline, b''):
# check if the ioThread is still running # print("Finished reading a line: {}".format(line))
if (not self.ioThread) or (not self.ioThread.is_alive()): # print("Adding item to queue...")
print("IO Thread (with subprocess) has ended, main thread now exiting...") line = line.decode("utf-8").rstrip()
self.stopTaosService() queue.put(line) # This might block, and then causing "out" buffer to block
self._procIpcBatch() # one more batch self._printProgress("_i")
return # TODO: maybe one last batch?
if self._status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started
# Maybe handler says we should exit now if line.find(self.TD_READY_MSG) != -1 : # found
if self.shouldStop: self._status = MainExec.STATUS_RUNNING
print("Main thread ending all IPC processing with IOThread/SubProcess")
self._procIpcBatch() # one more batch # Trim the queue if necessary: TODO: try this 1 out of 10 times
return self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
if self.isStopping() : # TODO: use thread status instead
print("_w", end="", flush=True) # WAITING for stopping sub process to finish its outptu
# queue.put(line)
print("\nNo more output from IO thread managing TDengine service") # meaning sub process must have died
out.close()
class TdeSubProcess:
def __init__(self):
self.subProcess = None
print(">", end="", flush=True) def getStdOut(self):
time.sleep(0.5) return self.subProcess.stdout
def startTaosService(self): def isRunning(self):
return self.subProcess != None
def start(self):
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg'] svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
# svcCmd = ['vmstat', '1'] # svcCmd = ['vmstat', '1']
...@@ -1782,41 +1886,18 @@ class SvcManager: ...@@ -1782,41 +1886,18 @@ class SvcManager:
svcCmd, svcCmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
# bufsize=1, # not supported in binary mode # bufsize=1, # not supported in binary mode
close_fds=ON_POSIX) # had text=True, which interferred with reading EOF close_fds=ON_POSIX) # had text=True, which interferred with reading EOF
self.ipcQueue = Queue()
if self.ioThread :
raise RuntimeError("Corrupt thread state")
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, self.ipcQueue))
self.ioThread.daemon = True # thread dies with the program
self.ioThread.start()
self.shouldStop = False # don't let the main loop stop def stop(self):
self.status = MainExec.STATUS_STARTING if not self.subProcess:
print("Sub process already stopped")
# wait for service to start
for i in range(0, 10) :
time.sleep(1.0)
self._procIpcBatch() # pump messages
print("_zz_", end="", flush=True)
if self.status == MainExec.STATUS_RUNNING :
print("TDengine service READY to process requests")
return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better?
def stopTaosService(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self.subProcess :
print("Process already stopped")
return return
retCode = self.subProcess.poll() retCode = self.subProcess.poll()
if retCode : # valid return code, process ended if retCode : # valid return code, process ended
self.subProcess = None self.subProcess = None
else: # process still alive, let's interrupt it else: # process still alive, let's interrupt it
print("Sub process still running, sending SIG_INT and waiting for it to stop...") print("Sub process is running, sending SIG_INT and waiting for it to terminate...")
self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end
try : try :
self.subProcess.wait(10) self.subProcess.wait(10)
...@@ -1826,41 +1907,20 @@ class SvcManager: ...@@ -1826,41 +1907,20 @@ class SvcManager:
print("TDengine service process terminated successfully from SIG_INT") print("TDengine service process terminated successfully from SIG_INT")
self.subProcess = None self.subProcess = None
if self.subProcess and (not self.subProcess.poll()):
print("Sub process is still running... pid = {}".format(self.subProcess.pid))
self.shouldStop = True
self.joinIoThread()
def run(self):
self.startTaosService()
# proc = subprocess.Popen(['echo', '"to stdout"'],
# stdout=subprocess.PIPE,
# )
# stdout_value = proc.communicate()[0]
# print('\tstdout: {}'.format(repr(stdout_value)))
self._procIpcAll()
print("End of loop reading from IPC queue")
self.joinIoThread() # should have started already
print("SvcManager Run Finished")
class ClientManager: class ClientManager:
def __init__(self): def __init__(self):
print("Starting service manager") print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler)
self.status = MainExec.STATUS_RUNNING self._status = MainExec.STATUS_RUNNING
self.tc = None self.tc = None
def sigIntHandler(self, signalNumber, frame): def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING : if self._status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...") print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status self._status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...") print("Terminating program...")
self.tc.requestToStop() self.tc.requestToStop()
...@@ -1904,16 +1964,16 @@ class ClientManager: ...@@ -1904,16 +1964,16 @@ class ClientManager:
self._printLastNumbers() self._printLastNumbers()
dbManager = DbManager() # Regular function dbManager = DbManager() # Regular function
Dice.seed(0) # initial seeding of dice
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager) self.tc = ThreadCoordinator(thPool, dbManager)
self.tc.run() self.tc.run()
# print("exec stats: {}".format(self.tc.getExecStats())) # print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed())) # print("TC failed = {}".format(self.tc.isFailed()))
self.conclude()
if gConfig.auto_start_service : if gConfig.auto_start_service :
svcMgr.stopTaosService() svcMgr.stopTaosService()
# Print exec status, etc., AFTER showing messages from the server
self.conclude()
# print("TC failed (2) = {}".format(self.tc.isFailed())) # print("TC failed (2) = {}".format(self.tc.isFailed()))
return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/ return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/
...@@ -1924,9 +1984,9 @@ class ClientManager: ...@@ -1924,9 +1984,9 @@ class ClientManager:
class MainExec: class MainExec:
STATUS_STARTING = 1 STATUS_STARTING = 1
STATUS_RUNNING = 2 STATUS_RUNNING = 2
STATUS_STOPPING = 3 STATUS_STOPPING = 3
# STATUS_STOPPED = 3 # Not used yet STATUS_STOPPED = 4
@classmethod @classmethod
def runClient(cls): def runClient(cls):
...@@ -2015,10 +2075,6 @@ def main(): ...@@ -2015,10 +2075,6 @@ def main():
global gConfig global gConfig
gConfig = parser.parse_args() gConfig = parser.parse_args()
# if len(sys.argv) == 1:
# parser.print_help()
# sys.exit()
# Logging Stuff # Logging Stuff
global logger global logger
...@@ -2034,15 +2090,14 @@ def main(): ...@@ -2034,15 +2090,14 @@ def main():
else: else:
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
Dice.seed(0) # initial seeding of dice
# Run server or client # Run server or client
if gConfig.run_tdengine : # run server if gConfig.run_tdengine : # run server
MainExec.runService() MainExec.runService()
else : else :
return MainExec.runClient() return MainExec.runClient()
# logger.info("Crash_Gen execution finished")
if __name__ == "__main__": if __name__ == "__main__":
exitCode = main() exitCode = main()
# print("Exiting with code: {}".format(exitCode)) # print("Exiting with code: {}".format(exitCode))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册