diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 173b6c448cc70514f7e738319de2487706a67cb5..b66e8d48b6f6ed51e621cfcb79d3e0de7e7e07c4 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -889,7 +889,7 @@ class StateEmpty(AnyState): def verifyTasksToState(self, tasks, newState): if (self.hasSuccess(tasks, TaskCreateDb) - ): # at EMPTY, if there's succes in creating DB + ): # at EMPTY, if there's succes in creating DB if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks # we must have at most one. TODO: compare numbers self.assertAtMostOneSuccess(tasks, TaskCreateDb) @@ -944,7 +944,7 @@ class StateSuperTableOnly(AnyState): def verifyTasksToState(self, tasks, newState): if (self.hasSuccess(tasks, TaskDropSuperTable) - ): # we are able to drop the table + ): # we are able to drop the table #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) # we must have had recreted it self.hasSuccess(tasks, TaskCreateSuperTable) @@ -978,7 +978,7 @@ class StateHasData(AnyState): self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy elif (newState.equals(AnyState.STATE_DB_ONLY)): # in DB only if (not self.hasTask(tasks, TaskCreateDb) - ): # without a create_db task + ): # without a create_db task # we must have drop_db task self.assertNoTask(tasks, TaskDropDb) self.hasSuccess(tasks, TaskDropSuperTable) @@ -990,11 +990,11 @@ class StateHasData(AnyState): # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA if (not self.hasTask(tasks, TaskCreateDb) - ): # only if we didn't create one + ): # only if we didn't create one # we shouldn't have dropped it self.assertNoTask(tasks, TaskDropDb) if (not self.hasTask(tasks, TaskCreateSuperTable) - ): # if we didn't create the table + ): # if we didn't create the table # we should not have a task that drops it self.assertNoTask(tasks, TaskDropSuperTable) # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) @@ -1385,15 +1385,18 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme - if ( errno2 in [ - 0x05, # TSDB_CODE_RPC_NOT_READY - 0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, - 0x510, # vnode not in ready state + errno2 = err.errno if ( + err.errno > 0) else 0x80000000 + err.errno # correct error scheme + if (errno2 in [ + 0x05, # TSDB_CODE_RPC_NOT_READY + 0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, + 0x510, # vnode not in ready state 0x600, - 1000 # REST catch-all error - ]) : # allowed errors - self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) + 1000 # REST catch-all error + ]): # allowed errors + self.logDebug( + "[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( + errno2, err, self._lastSql)) print("_", end="", flush=True) self._err = err else: @@ -1862,7 +1865,8 @@ class MyLoggingAdapter(logging.LoggerAdapter): return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs -class SvcManager: + +class SvcManager: MAX_QUEUE_SIZE = 10000 def __init__(self): @@ -1873,35 +1877,39 @@ class SvcManager: self.ioThread = None self.subProcess = None self.shouldStop = False - # self.status = MainExec.STATUS_RUNNING # set inside _startTaosService() + # self.status = MainExec.STATUS_RUNNING # set inside + # _startTaosService() def svcOutputReader(self, out: IO, queue): - # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python + # Important Reference: + # https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python print("This is the svcOutput Reader...") - # for line in out : + # for line in out : for line in iter(out.readline, b''): # print("Finished reading a line: {}".format(line)) # print("Adding item to queue...") line = line.decode("utf-8").rstrip() - queue.put(line) # This might block, and then causing "out" buffer to block - print("_i", end="", flush=True) + # This might block, and then causing "out" buffer to block + queue.put(line) + print("_i", end="", flush=True) # Trim the queue if necessary oneTenthQSize = self.MAX_QUEUE_SIZE // 10 - if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize) ) : # 90% full? + if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize)): # 90% full? print("Triming IPC queue by: {}".format(oneTenthQSize)) - for i in range(0, oneTenthQSize) : + for i in range(0, oneTenthQSize): try: queue.get_nowait() except Empty: - break # break out of for loop, no more trimming + break # break out of for loop, no more trimming - if self.shouldStop : + if self.shouldStop: print("Stopping to read output from sub process") break # queue.put(line) - print("\nNo more output (most likely) from IO thread managing TDengine service") # meaning sub process must have died + # meaning sub process must have died + print("\nNo more output (most likely) from IO thread managing TDengine service") out.close() def _doMenu(self): @@ -1912,30 +1920,32 @@ class SvcManager: print("2: Terminate") print("3: Restart") # Remember to update the if range below - # print("Enter Choice: ", end="", flush=True) + # print("Enter Choice: ", end="", flush=True) while choice == "": choice = input("Enter Choice: ") if choice != "": - break # done with reading repeated input - if choice in ["1", "2", "3"]: - break # we are done with whole method + break # done with reading repeated input + if choice in ["1", "2", "3"]: + break # we are done with whole method print("Invalid choice, please try again.") - choice = "" # reset + choice = "" # reset return choice - def sigUsrHandler(self, signalNumber, frame) : + def sigUsrHandler(self, signalNumber, frame): print("Interrupting main thread execution upon SIGUSR1") - if self.status != MainExec.STATUS_RUNNING : + if self.status != MainExec.STATUS_RUNNING: print("Ignoring repeated SIG...") - return # do nothing if it's already not running + return # do nothing if it's already not running self.status = MainExec.STATUS_STOPPING choice = self._doMenu() - if choice == "1" : - self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? - elif choice == "2" : + if choice == "1": + # TODO: can the sub-process be blocked due to us not reading from + # queue? + self.sigHandlerResume() + elif choice == "2": self.stopTaosService() - elif choice == "3" : + elif choice == "3": self.stopTaosService() self.startTaosService() else: @@ -1943,59 +1953,62 @@ class SvcManager: def sigIntHandler(self, signalNumber, frame): print("Sig INT Handler starting...") - if self.status != MainExec.STATUS_RUNNING : + if self.status != MainExec.STATUS_RUNNING: print("Ignoring repeated SIG_INT...") return - self.status = MainExec.STATUS_STOPPING # immediately set our status - self.stopTaosService() - print("INT signal handler returning...") + self.status = MainExec.STATUS_STOPPING # immediately set our status + self.stopTaosService() + print("INT signal handler returning...") - def sigHandlerResume(self) : + def sigHandlerResume(self): print("Resuming TDengine service manager thread (main thread)...\n\n") self.status = MainExec.STATUS_RUNNING def joinIoThread(self): if self.ioThread: self.ioThread.join() - self.ioThread = None - else : + self.ioThread = None + else: print("Joining empty thread, doing nothing") - + TD_READY_MSG = "TDengine is initialized successfully" + def _procIpcBatch(self): - # Process all the output generated by the underlying sub process, managed by IO thread - while True : - try: - line = self.ipcQueue.get_nowait() # getting output at fast speed - print("_o", end="", flush=True) - if self.status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started - if line.find(self.TD_READY_MSG) != -1 : # found + # Process all the output generated by the underlying sub process, + # managed by IO thread + while True: + try: + line = self.ipcQueue.get_nowait() # getting output at fast speed + print("_o", end="", flush=True) + if self.status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started + if line.find(self.TD_READY_MSG) != -1: # found self.status = MainExec.STATUS_RUNNING - + except Empty: # time.sleep(2.3) # wait only if there's no output # no more output - return # we are done with THIS BATCH - else: # got line + return # we are done with THIS BATCH + else: # got line print(line) def _procIpcAll(self): - while True : + while True: print("<", end="", flush=True) - self._procIpcBatch() # process one batch + self._procIpcBatch() # process one batch # check if the ioThread is still running if (not self.ioThread) or (not self.ioThread.is_alive()): - print("IO Thread (with subprocess) has ended, main thread now exiting...") + print( + "IO Thread (with subprocess) has ended, main thread now exiting...") self.stopTaosService() - self._procIpcBatch() # one more batch - return # TODO: maybe one last batch? + self._procIpcBatch() # one more batch + return # TODO: maybe one last batch? # Maybe handler says we should exit now if self.shouldStop: print("Main thread ending all IPC processing with IOThread/SubProcess") - self._procIpcBatch() # one more batch + self._procIpcBatch() # one more batch return print(">", end="", flush=True) @@ -2024,50 +2037,58 @@ class SvcManager: svcCmd = [taosdPath, '-c', cfgPath] # svcCmd = ['vmstat', '1'] - if self.subProcess : # already there + if self.subProcess: # already there raise RuntimeError("Corrupt process state") self.subProcess = subprocess.Popen( - svcCmd, - stdout=subprocess.PIPE, + svcCmd, + stdout=subprocess.PIPE, # bufsize=1, # not supported in binary mode - close_fds=ON_POSIX) # had text=True, which interferred with reading EOF + close_fds=ON_POSIX) # had text=True, which interferred with reading EOF self.ipcQueue = Queue() - if self.ioThread : + if self.ioThread: raise RuntimeError("Corrupt thread state") - self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, self.ipcQueue)) - self.ioThread.daemon = True # thread dies with the program + self.ioThread = threading.Thread( + target=self.svcOutputReader, args=( + self.subProcess.stdout, self.ipcQueue)) + self.ioThread.daemon = True # thread dies with the program self.ioThread.start() - self.shouldStop = False # don't let the main loop stop + self.shouldStop = False # don't let the main loop stop self.status = MainExec.STATUS_STARTING # wait for service to start - for i in range(0, 10) : + for i in range(0, 10): time.sleep(1.0) - self._procIpcBatch() # pump messages + self._procIpcBatch() # pump messages print("_zz_", end="", flush=True) - if self.status == MainExec.STATUS_RUNNING : + if self.status == MainExec.STATUS_RUNNING: print("TDengine service READY to process requests") - return # now we've started - raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better? + return # now we've started + # TODO: handle this better? + raise RuntimeError("TDengine service did not start successfully") def stopTaosService(self): # can be called from both main thread or signal handler print("Terminating TDengine service running as the sub process...") - # Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes - if not self.subProcess : + # Linux will send Control-C generated SIGINT to the TDengine process + # already, ref: + # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes + if not self.subProcess: print("Process already stopped") return retCode = self.subProcess.poll() - if retCode : # valid return code, process ended + if retCode: # valid return code, process ended self.subProcess = None - else: # process still alive, let's interrupt it - print("Sub process still running, sending SIG_INT and waiting for it to stop...") - self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end - try : + else: # process still alive, let's interrupt it + print( + "Sub process still running, sending SIG_INT and waiting for it to stop...") + # sub process should end, then IPC queue should end, causing IO + # thread to end + self.subProcess.send_signal(signal.SIGINT) + try: self.subProcess.wait(10) except subprocess.TimeoutExpired as err: print("Time out waiting for TDengine service process to exit") @@ -2076,15 +2097,17 @@ class SvcManager: self.subProcess = None if self.subProcess and (not self.subProcess.poll()): - print("Sub process is still running... pid = {}".format(self.subProcess.pid)) - + print( + "Sub process is still running... pid = {}".format( + self.subProcess.pid)) + self.shouldStop = True self.joinIoThread() def run(self): self.startTaosService() - # proc = subprocess.Popen(['echo', '"to stdout"'], + # proc = subprocess.Popen(['echo', '"to stdout"'], # stdout=subprocess.PIPE, # ) # stdout_value = proc.communicate()[0] @@ -2093,7 +2116,7 @@ class SvcManager: self._procIpcAll() print("End of loop reading from IPC queue") - self.joinIoThread() # should have started already + self.joinIoThread() # should have started already print("SvcManager Run Finished") @@ -2148,7 +2171,7 @@ class ClientManager: self._printLastNumbers() def run(self): - if gConfig.auto_start_service : + if gConfig.auto_start_service: svcMgr = SvcManager() svcMgr.startTaosService() @@ -2163,7 +2186,7 @@ class ClientManager: # print("exec stats: {}".format(self.tc.getExecStats())) # print("TC failed = {}".format(self.tc.isFailed())) self.conclude() - if gConfig.auto_start_service : + if gConfig.auto_start_service: svcMgr.stopTaosService() # print("TC failed (2) = {}".format(self.tc.isFailed())) # Linux return code: ref https://shapeshed.com/unix-exit-codes/ @@ -2248,24 +2271,57 @@ def main(): ''')) - parser.add_argument('-a', '--auto-start-service', action='store_true', - help='Automatically start/stop the TDengine service (default: false)') - parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, - help='Connector type to use: native, rest, or mixed (default: 10)') - parser.add_argument('-d', '--debug', action='store_true', - help='Turn on DEBUG mode for more logging (default: false)') - parser.add_argument('-e', '--run-tdengine', action='store_true', - help='Run TDengine service in foreground (default: false)') - parser.add_argument('-l', '--larger-data', action='store_true', - help='Write larger amount of data during write operations (default: false)') - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', - help='Use a single shared db connection (default: false)') - parser.add_argument('-r', '--record-ops', action='store_true', - help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') - parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, - help='Maximum number of steps to run (default: 100)') - parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, - help='Number of threads to run (default: 10)') + parser.add_argument( + '-a', + '--auto-start-service', + action='store_true', + help='Automatically start/stop the TDengine service (default: false)') + parser.add_argument( + '-c', + '--connector-type', + action='store', + default='native', + type=str, + help='Connector type to use: native, rest, or mixed (default: 10)') + parser.add_argument( + '-d', + '--debug', + action='store_true', + help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument( + '-e', + '--run-tdengine', + action='store_true', + help='Run TDengine service in foreground (default: false)') + parser.add_argument( + '-l', + '--larger-data', + action='store_true', + help='Write larger amount of data during write operations (default: false)') + parser.add_argument( + '-p', + '--per-thread-db-connection', + action='store_true', + help='Use a single shared db connection (default: false)') + parser.add_argument( + '-r', + '--record-ops', + action='store_true', + help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') + parser.add_argument( + '-s', + '--max-steps', + action='store', + default=1000, + type=int, + help='Maximum number of steps to run (default: 100)') + parser.add_argument( + '-t', + '--num-threads', + action='store', + default=5, + type=int, + help='Number of threads to run (default: 10)') global gConfig gConfig = parser.parse_args()