diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 9e38e04b6392edf74a2d9c9648c5a8b7637bbe2a..98181180e200fade047cfcc8f1be68434220af13 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -42,6 +42,13 @@ import os import io import signal import traceback + +try: + import psutil +except: + print("Psutil module needed, please install: sudo pip3 install psutil") + sys.exit(-1) + # Require Python 3 if sys.version_info[0] < 3: raise Exception("Must be using Python 3") @@ -52,13 +59,12 @@ if sys.version_info[0] < 3: # Command-line/Environment Configurations, will set a bit later # ConfigNameSpace = argparse.Namespace gConfig = argparse.Namespace() # Dummy value, will be replaced later +gSvcMgr = None # TODO: refactor this hack, use dep injection logger = None - def runThread(wt: WorkerThread): wt.run() - class CrashGenError(Exception): def __init__(self, msg=None, errno=None): self.msg = msg @@ -69,8 +75,7 @@ class CrashGenError(Exception): class WorkerThread: - def __init__(self, pool: ThreadPool, tid, - tc: ThreadCoordinator, + def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, # te: TaskExecutor, ): # note: main thread context! # self._curStep = -1 @@ -131,18 +136,28 @@ class WorkerThread: # clean up if (gConfig.per_thread_db_connection): # type: ignore - self._dbConn.close() + if self._dbConn.isOpen: #sometimes it is not open + self._dbConn.close() + else: + logger.warning("Cleaning up worker thread, dbConn already closed") def _doTaskLoop(self): # while self._curStep < self._pool.maxSteps: # tc = ThreadCoordinator(None) while True: tc = self._tc # Thread Coordinator, the overall master - tc.crossStepBarrier() # shared barrier first, INCLUDING the last one + try: + tc.crossStepBarrier() # shared barrier first, INCLUDING the last one + except threading.BrokenBarrierError as err: # main thread timed out + print("_bto", end="") + logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out") + break + logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) if not self._tc.isRunning(): + print("_wts", end="") logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break @@ -159,6 +174,7 @@ class WorkerThread: logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) self._dbInUse = False # there may be changes between steps + # print("_wtd", end=None) # worker thread died def verifyThreadSelf(self): # ensure we are called by this own thread if (threading.get_ident() != self._thread.ident): @@ -187,30 +203,24 @@ class WorkerThread: # self._curStep += 1 # off to a new step... def tapStepGate(self): # give it a tap, release the thread waiting there - self.verifyThreadAlive() + # self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - logger.debug("[TRD] Tapping worker thread {}".format(self._tid)) - self._stepGate.set() # wake up! - time.sleep(0) # let the released thread run a bit + if self._thread.is_alive(): + logger.debug("[TRD] Tapping worker thread {}".format(self._tid)) + self._stepGate.set() # wake up! + time.sleep(0) # let the released thread run a bit + else: + print("_tad", end="") # Thread already dead def execSql(self, sql): # TODO: expose DbConn directly - if (gConfig.per_thread_db_connection): - return self._dbConn.execute(sql) - else: - return self._tc.getDbManager().getDbConn().execute(sql) + return self.getDbConn().execute(sql) def querySql(self, sql): # TODO: expose DbConn directly - if (gConfig.per_thread_db_connection): - return self._dbConn.query(sql) - else: - return self._tc.getDbManager().getDbConn().query(sql) + return self.getDbConn().query(sql) def getQueryResult(self): - if (gConfig.per_thread_db_connection): - return self._dbConn.getQueryResult() - else: - return self._tc.getDbManager().getDbConn().getQueryResult() + return self.getDbConn().getQueryResult() def getDbConn(self): if (gConfig.per_thread_db_connection): @@ -228,6 +238,8 @@ class WorkerThread: class ThreadCoordinator: + WORKER_THREAD_TIMEOUT = 30 + def __init__(self, pool: ThreadPool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool @@ -248,14 +260,14 @@ class ThreadCoordinator: def getDbManager(self) -> DbManager: return self._dbManager - def crossStepBarrier(self): - self._stepBarrier.wait() + def crossStepBarrier(self, timeout=None): + self._stepBarrier.wait(timeout) def requestToStop(self): self._runStatus = MainExec.STATUS_STOPPING self._execStats.registerFailure("User Interruption") - def _runShouldEnd(self, transitionFailed, hasAbortedTask): + def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): maxSteps = gConfig.max_steps # type: ignore if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 return True @@ -265,6 +277,8 @@ class ThreadCoordinator: return True if hasAbortedTask: return True + if workerTimeout: + return True return False def _hasAbortedTask(self): # from execution of previous step @@ -296,7 +310,7 @@ class ThreadCoordinator: # let other threads go past the pool barrier, but wait at the # thread gate logger.debug("[TRD] Main thread about to cross the barrier") - self.crossStepBarrier() + self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT) self._stepBarrier.reset() # Other worker threads should now be at the "gate" logger.debug("[TRD] Main thread finished crossing the barrier") @@ -327,6 +341,7 @@ class ThreadCoordinator: # end, and maybe signal them to stop else: raise + return transitionFailed self.resetExecutedTasks() # clear the tasks after we are done # Get ready for next step @@ -342,11 +357,21 @@ class ThreadCoordinator: self._execStats.startExec() # start the stop watch transitionFailed = False hasAbortedTask = False - while not self._runShouldEnd(transitionFailed, hasAbortedTask): + workerTimeout = False + while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): if not gConfig.debug: # print this only if we are not in debug mode print(".", end="", flush=True) - self._syncAtBarrier() # For now just cross the barrier + try: + self._syncAtBarrier() # For now just cross the barrier + except threading.BrokenBarrierError as err: + logger.info("Main loop aborted, caused by worker thread time-out") + self._execStats.registerFailure("Aborted due to worker thread timeout") + print("\n\nWorker Thread time-out detected, important thread info:") + ts = ThreadStacks() + ts.print(filterInternal=True) + workerTimeout = True + break # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # We use this period to do house keeping work, when all worker @@ -358,12 +383,20 @@ class ThreadCoordinator: break # do transition only if tasks are error free # Ending previous step - transitionFailed = self._doTransition() # To start, we end step -1 first + try: + transitionFailed = self._doTransition() # To start, we end step -1 first + except taos.error.ProgrammingError as err: + transitionFailed = True + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme + logger.info("Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)) + # Then we move on to the next step self._releaseAllWorkerThreads(transitionFailed) if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" logger.debug("Abnormal ending of main thraed") + elif workerTimeout: + logger.debug("Abnormal ending of main thread, due to worker timeout") else: # regular ending, workers waiting at "barrier" logger.debug("Regular ending, main thread waiting for all worker threads to stop...") self._syncAtBarrier() @@ -561,6 +594,10 @@ class DbConn: def __init__(self): self.isOpen = False self._type = self.TYPE_INVALID + self._lastSql = None + + def getLastSql(self): + return self._lastSql def open(self): if (self.isOpen): @@ -569,9 +606,7 @@ class DbConn: # below implemented by child classes self.openByType() - logger.debug( - "[DB] data connection opened, type = {}".format( - self._type)) + logger.debug("[DB] data connection opened, type = {}".format(self._type)) self.isOpen = True def resetDb(self): # reset the whole database, etc. @@ -594,21 +629,29 @@ class DbConn: def _queryAny(self, sql): # actual query result as an int if (not self.isOpen): - raise RuntimeError( - "Cannot query database until connection is open") + raise RuntimeError("Cannot query database until connection is open") nRows = self.query(sql) if nRows != 1: - raise RuntimeError( - "Unexpected result for query: {}, rows = {}".format( - sql, nRows)) + raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) if self.getResultRows() != 1 or self.getResultCols() != 1: - raise RuntimeError( - "Unexpected result set for query: {}".format(sql)) + raise RuntimeError("Unexpected result set for query: {}".format(sql)) return self.getQueryResult()[0][0] + def use(self, dbName): + self.execute("use {}".format(dbName)) + + def hasDatabases(self): + return self.query("show databases") > 0 + + def hasTables(self): + return self.query("show tables") > 0 + def execute(self, sql): raise RuntimeError("Unexpected execution, should be overriden") + def query(self, sql) -> int: # return num rows returned + raise RuntimeError("Unexpected execution, should be overriden") + def openByType(self): raise RuntimeError("Unexpected execution, should be overriden") @@ -643,10 +686,11 @@ class DbConnRest(DbConn): self.isOpen = False def _doSql(self, sql): + self._lastSql = sql # remember this, last SQL attempted try: r = requests.post(self._url, data = sql, - auth = HTTPBasicAuth('root', 'taosdata')) + auth = HTTPBasicAuth('root', 'taosdata')) except: print("REST API Failure (TODO: more info here)") raise @@ -742,11 +786,16 @@ class MyTDSql: class DbConnNative(DbConn): + # Class variables + _lock = threading.Lock() + _connInfoDisplayed = False + def __init__(self): super().__init__() self._type = self.TYPE_NATIVE self._conn = None self._cursor = None + def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -755,31 +804,32 @@ class DbConnNative(DbConn): else: projPath = selfPath[:selfPath.find("tests")] + buildPath = None for root, dirs, files in os.walk(projPath): if ("taosd" in files): rootRealPath = os.path.dirname(os.path.realpath(root)) if ("packaging" not in rootRealPath): buildPath = root[:len(root) - len("/build/bin")] break + if buildPath == None: + raise RuntimeError("Failed to determine buildPath, selfPath={}".format(self_path)) return buildPath - connInfoDisplayed = False + def openByType(self): # Open connection cfgPath = self.getBuildPath() + "/test/cfg" hostAddr = "127.0.0.1" - if not self.connInfoDisplayed: - logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) - self.connInfoDisplayed = True - - self._conn = taos.connect( - host=hostAddr, - config=cfgPath) # TODO: make configurable - self._cursor = self._conn.cursor() - # Get the connection/cursor ready + with self._lock: # force single threading for opening DB connections + if not self._connInfoDisplayed: + self.__class__._connInfoDisplayed = True # updating CLASS variable + logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) + + self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable + self._cursor = self._conn.cursor() + self._cursor.execute('reset query cache') # self._cursor.execute('use db') # do this at the beginning of every - # step # Open connection self._tdSql = MyTDSql() @@ -984,29 +1034,11 @@ class StateDbOnly(AnyState): if (not self.hasTask(tasks, TaskCreateDb)): # only if we don't create any more self.assertAtMostOneSuccess(tasks, TaskDropDb) - self.assertIfExistThenSuccess(tasks, TaskDropDb) - # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases - # Nothing to be said about adding data task - # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB - # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess - # self.assertAtMostOneSuccess(tasks, DropDbTask) - # self._state = self.STATE_EMPTY - # if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success - # # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - # if ( not self.hasTask(tasks, TaskDropSuperTable) ): - # self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything - # self.assertNoTask(tasks, DropDbTask) # should have have tried - # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet - # # can't say there's add-data attempts, since they may all fail - # self._state = self.STATE_TABLE_ONLY - # else: - # self._state = self.STATE_HAS_DATA - # What about AddFixedData? - # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): - # self._state = self.STATE_HAS_DATA - # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail - # # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks, - # self._state = self.STATE_DB_ONLY # no change + + # TODO: restore the below, the problem exists, although unlikely in real-world + # if (gSvcMgr!=None) and gSvcMgr.isRestarting(): + # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : + # self.assertIfExistThenSuccess(tasks, TaskDropDb) class StateSuperTableOnly(AnyState): @@ -1082,7 +1114,7 @@ class StateMechine: self._curState = self._findCurrentState() # starting state # transitition target probabilities, indexed with value of STATE_EMPTY, # STATE_DB_ONLY, etc. - self._stateWeights = [1, 3, 5, 15] + self._stateWeights = [1, 2, 10, 40] def getCurrentState(self): return self._curState @@ -1128,33 +1160,22 @@ class StateMechine: def _findCurrentState(self): dbc = self._dbConn ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state - if dbc.query("show databases") == 0: # no database?! - # logger.debug("Found EMPTY state") - logger.debug( - "[STT] empty database found, between {} and {}".format( - ts, time.time())) + if not dbc.hasDatabases(): # no database?! + logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time())) return StateEmpty() # did not do this when openning connection, and this is NOT the worker # thread, which does this on their own - dbc.execute("use db") - if dbc.query("show tables") == 0: # no tables - # logger.debug("Found DB ONLY state") - logger.debug( - "[STT] DB_ONLY found, between {} and {}".format( - ts, time.time())) + dbc.use("db") + if not dbc.hasTables(): # no tables + logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) return StateDbOnly() - if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) - ) == 0: # no regular tables - # logger.debug("Found TABLE_ONLY state") - logger.debug( - "[STT] SUPER_TABLE_ONLY found, between {} and {}".format( - ts, time.time())) + + sTable = DbManager.getFixedSuperTable() + if sTable.hasRegTables(dbc): # no regular tables + logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) return StateSuperTableOnly() else: # has actual tables - # logger.debug("Found HAS_DATA state") - logger.debug( - "[STT] HAS_DATA found, between {} and {}".format( - ts, time.time())) + logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) return StateHasData() def transition(self, tasks): @@ -1172,7 +1193,8 @@ class StateMechine: # case of multiple creation and drops if self._curState.canDropDb(): - self._curState.assertIfExistThenSuccess(tasks, TaskDropDb) + if gSvcMgr == None: # only if we are running as client-only + self._curState.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in # case of drop-create-drop @@ -1300,13 +1322,17 @@ class DbManager(): def getFixedSuperTableName(cls): return "fs_table" + @classmethod + def getFixedSuperTable(cls): + return TdSuperTable(cls.getFixedSuperTableName()) + def releaseTable(self, i): # return the table back, so others can use it self.tableNumQueue.release(i) def getNextTick(self): with self._lock: # prevent duplicate tick - if Dice.throw(10) == 0: # 1 in 10 chance - return self._lastTick + datetime.timedelta(0, -100) + if Dice.throw(20) == 0: # 1 in 20 chance + return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds else: # regular # add one second to it self._lastTick += datetime.timedelta(0, 1) @@ -1322,7 +1348,9 @@ class DbManager(): self.getNextInt()) def getNextFloat(self): - return 0.9 + self.getNextInt() + ret = 0.9 + self.getNextInt() + # print("Float obtained: {}".format(ret)) + return ret def getTableNameToDelete(self): tblNum = self.tableNumQueue.pop() # TODO: race condition! @@ -1340,33 +1368,35 @@ class TaskExecutor(): def __init__(self, size=10): self._size = size self._list = [] + self._lock = threading.Lock() def add(self, n: int): - if not self._list: # empty - self._list.append(n) - return - # now we should insert - nItems = len(self._list) - insPos = 0 - for i in range(nItems): - insPos = i - if n <= self._list[i]: # smaller than this item, time to insert - break # found the insertion point - insPos += 1 # insert to the right - - if insPos == 0: # except for the 1st item, # TODO: elimiate first item as gating item - return # do nothing - - # print("Inserting at postion {}, value: {}".format(insPos, n)) - self._list.insert(insPos, n) # insert - - newLen = len(self._list) - if newLen <= self._size: - return # do nothing - elif newLen == (self._size + 1): - del self._list[0] # remove the first item - else: - raise RuntimeError("Corrupt Bounded List") + with self._lock: + if not self._list: # empty + self._list.append(n) + return + # now we should insert + nItems = len(self._list) + insPos = 0 + for i in range(nItems): + insPos = i + if n <= self._list[i]: # smaller than this item, time to insert + break # found the insertion point + insPos += 1 # insert to the right + + if insPos == 0: # except for the 1st item, # TODO: elimiate first item as gating item + return # do nothing + + # print("Inserting at postion {}, value: {}".format(insPos, n)) + self._list.insert(insPos, n) # insert + + newLen = len(self._list) + if newLen <= self._size: + return # do nothing + elif newLen == (self._size + 1): + del self._list[0] # remove the first item + else: + raise RuntimeError("Corrupt Bounded List") def __str__(self): return repr(self._list) @@ -1419,7 +1449,6 @@ class Task(): # logger.debug("Creating new task {}...".format(self._taskNum)) self._execStats = execStats - self._lastSql = "" # last SQL executed/attempted def isSuccess(self): return self._err is None @@ -1446,6 +1475,39 @@ class Task(): "To be implemeted by child classes, class name: {}".format( self.__class__.__name__)) + def _isErrAcceptable(self, errno, msg): + if errno in [ + 0x05, # TSDB_CODE_RPC_NOT_READY + # 0x200, # invalid SQL, TODO: re-examine with TD-934 + 0x360, 0x362, + 0x369, # tag already exists + 0x36A, 0x36B, 0x36D, + 0x381, + 0x380, # "db not selected" + 0x383, + 0x386, # DB is being dropped?! + 0x503, + 0x510, # vnode not in ready state + 0x600, + 1000 # REST catch-all error + ]: + return True # These are the ALWAYS-ACCEPTABLE ones + elif (errno in [ 0x0B ]) and gConfig.auto_start_service: + return True # We may get "network unavilable" when restarting service + elif errno == 0x200 : # invalid SQL, we need to div in a bit more + if msg.find("invalid column name") != -1: + return True + elif msg.find("tags number not matched") != -1: # mismatched tags after modification + return True + elif msg.find("duplicated column names") != -1: # also alter table tag issues + return True + elif (gSvcMgr!=None) and gSvcMgr.isRestarting(): + logger.info("Ignoring error when service is restarting: errno = {}, msg = {}".format(errno, msg)) + return True + + return False # Not an acceptable error + + def execute(self, wt: WorkerThread): wt.verifyThreadSelf() self._workerThread = wt # type: ignore @@ -1456,36 +1518,25 @@ class Task(): "[-] executing task {}...".format(self.__class__.__name__)) self._err = None - self._execStats.beginTaskType( - self.__class__.__name__) # mark beginning + self._execStats.beginTaskType(self.__class__.__name__) # mark beginning + errno2 = None try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - errno2 = err.errno if ( - err.errno > 0) else 0x80000000 + err.errno # correct error scheme + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme if (gConfig.continue_on_exception): # user choose to continue - self.logDebug( - "[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( - errno2, err, self._lastSql)) + self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( + errno2, err, wt.getDbConn().getLastSql())) self._err = err - elif (errno2 in [ - 0x05, # TSDB_CODE_RPC_NOT_READY - 0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, - 0x381, 0x380, 0x383, - 0x386, # DB is being dropped?! - 0x503, - 0x510, # vnode not in ready state - 0x600, - 1000 # REST catch-all error - ]): # allowed errors - self.logDebug( - "[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( - errno2, err, self._lastSql)) + elif self._isErrAcceptable(errno2, err.__str__()): + self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( + errno2, err, wt.getDbConn().getLastSql())) print("_", end="", flush=True) self._err = err - else: - errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( - errno2, err, self._lastSql) + else: # not an acceptable error + errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( + self.__class__.__name__, + errno2, err, wt.getDbConn().getLastSql()) self.logDebug(errMsg) if gConfig.debug: # raise # so that we see full stack @@ -1509,25 +1560,22 @@ class Task(): except BaseException: self.logDebug( "[=] Unexpected exception, SQL: {}".format( - self._lastSql)) + wt.getDbConn().getLastSql())) raise self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) self.logDebug("[X] task execution completed, {}, status: {}".format( self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) # TODO: merge with above. - self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) + self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2) def execSql(self, sql): - self._lastSql = sql return self._dbManager.execute(sql) def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread - self._lastSql = sql return wt.execSql(sql) def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread - self._lastSql = sql return wt.querySql(sql) def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread @@ -1542,6 +1590,7 @@ class ExecutionStats: self._lock = threading.Lock() self._firstTaskStartTime = None self._execStartTime = None + self._errors = {} self._elapsedTime = 0.0 # total elapsed time self._accRunTime = 0.0 # accumulated run time @@ -1561,13 +1610,18 @@ class ExecutionStats: def endExec(self): self._elapsedTime = time.time() - self._execStartTime - def incExecCount(self, klassName, isSuccess): # TODO: add a lock here + def incExecCount(self, klassName, isSuccess, eno=None): # TODO: add a lock here if klassName not in self._execTimes: self._execTimes[klassName] = [0, 0] t = self._execTimes[klassName] # tuple for the data t[0] += 1 # index 0 has the "total" execution times if isSuccess: t[1] += 1 # index 1 has the "success" execution times + if eno != None: + if klassName not in self._errors: + self._errors[klassName] = {} + errors = self._errors[klassName] + errors[eno] = errors[eno]+1 if eno in errors else 1 def beginTaskType(self, klassName): with self._lock: @@ -1597,7 +1651,14 @@ class ExecutionStats: execTimesAny = 0 for k, n in self._execTimes.items(): execTimesAny += n[0] - logger.info("| {0:<24}: {1}/{2}".format(k, n[1], n[0])) + errStr = None + if k in self._errors: + errors = self._errors[k] + # print("errors = {}".format(errors)) + errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()] + # print("error strings = {}".format(errStrs)) + errStr = ", ".join(errStrs) + logger.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr)) logger.info( "| Total Tasks Executed (success or not): {} ".format(execTimesAny)) @@ -1649,7 +1710,7 @@ class StateTransitionTask(Task): @classmethod def getRegTableName(cls, i): - return "db.reg_table_{}".format(i) + return "reg_table_{}".format(i) def execute(self, wt: WorkerThread): super().execute(wt) @@ -1696,15 +1757,94 @@ class TaskCreateSuperTable(StateTransitionTask): logger.debug("Skipping task, no DB yet") return - tblName = self._dbManager.getFixedSuperTableName() + sTable = self._dbManager.getFixedSuperTable() # wt.execSql("use db") # should always be in place - self.execWtSql( - wt, - "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) + sTable.create(wt.getDbConn(), {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}) + # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that # automatically +class TdSuperTable: + def __init__(self, stName): + self._stName = stName + + def create(self, dbc, cols: dict, tags: dict): + sql = "CREATE TABLE db.{} ({}) TAGS ({})".format( + self._stName, + ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]), + ",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) + ) + dbc.execute(sql) + + def getRegTables(self, dbc: DbConn): + try: + dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later + except taos.error.ProgrammingError as err: + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno + logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) + raise + + qr = dbc.getQueryResult() + return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation + + def hasRegTables(self, dbc: DbConn): + return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0 + + def ensureTable(self, dbc: DbConn, regTableName: str): + sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName) + if dbc.query(sql) >= 1 : # reg table exists already + return + sql = "CREATE TABLE {} USING {} tags ({})".format( + regTableName, self._stName, self._getTagStrForSql(dbc) + ) + dbc.execute(sql) + + def _getTagStrForSql(self, dbc) : + tags = self._getTags(dbc) + tagStrs = [] + for tagName in tags: + tagType = tags[tagName] + if tagType == 'BINARY': + tagStrs.append("'Beijing-Shanghai-LosAngeles'") + elif tagType == 'FLOAT': + tagStrs.append('9.9') + elif tagType == 'INT': + tagStrs.append('88') + else: + raise RuntimeError("Unexpected tag type: {}".format(tagType)) + return ", ".join(tagStrs) + + def _getTags(self, dbc) -> dict: + dbc.query("DESCRIBE {}".format(self._stName)) + stCols = dbc.getQueryResult() + # print(stCols) + ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type + # print("Tags retrieved: {}".format(ret)) + return ret + + def addTag(self, dbc, tagName, tagType): + if tagName in self._getTags(dbc): # already + return + # sTable.addTag("extraTag", "int") + sql = "alter table db.{} add tag {} {}".format(self._stName, tagName, tagType) + dbc.execute(sql) + + def dropTag(self, dbc, tagName): + if not tagName in self._getTags(dbc): # don't have this tag + return + sql = "alter table db.{} drop tag {}".format(self._stName, tagName) + dbc.execute(sql) + + def changeTag(self, dbc, oldTag, newTag): + tags = self._getTags(dbc) + if not oldTag in tags: # don't have this tag + return + if newTag in tags: # already have this tag + return + sql = "alter table db.{} change tag {} {}".format(self._stName, oldTag, newTag) + dbc.execute(sql) + class TaskReadData(StateTransitionTask): @classmethod def getEndState(cls): @@ -1715,23 +1855,24 @@ class TaskReadData(StateTransitionTask): return state.canReadData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - sTbName = self._dbManager.getFixedSuperTableName() - self.queryWtSql(wt, "select TBNAME from db.{}".format( - sTbName)) # TODO: analyze result set later + sTable = self._dbManager.getFixedSuperTable() if random.randrange( 5) == 0: # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations wt.getDbConn().close() wt.getDbConn().open() - else: - # wt.getDbConn().getQueryResult() - rTables = self.getQueryResult(wt) - # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - for rTbName in rTables: # regular tables - self.execWtSql(wt, "select * from db.{}".format(rTbName[0])) - - # tdSql.query(" cars where tbname in ('carzero', 'carone')") - + + for rTbName in sTable.getRegTables(wt.getDbConn()): # regular tables + aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)', + # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable + 'sum(speed)', 'stddev(speed)', + 'min(speed)', 'max(speed)', 'first(speed)', 'last(speed)']) # TODO: add more from 'top' + try: + self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName)) + except taos.error.ProgrammingError as err: + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno + logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, wt.getDbConn().getLastSql())) + raise class TaskDropSuperTable(StateTransitionTask): @classmethod @@ -1789,20 +1930,55 @@ class TaskAlterTags(StateTransitionTask): return state.canDropFixedSuperTable() # if we can drop it, we can alter tags def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbManager.getFixedSuperTableName() + # tblName = self._dbManager.getFixedSuperTableName() + dbc = wt.getDbConn() + sTable = self._dbManager.getFixedSuperTable() dice = Dice.throw(4) if dice == 0: - sql = "alter table db.{} add tag extraTag int".format(tblName) + sTable.addTag(dbc, "extraTag", "int") + # sql = "alter table db.{} add tag extraTag int".format(tblName) elif dice == 1: - sql = "alter table db.{} drop tag extraTag".format(tblName) + sTable.dropTag(dbc, "extraTag") + # sql = "alter table db.{} drop tag extraTag".format(tblName) elif dice == 2: - sql = "alter table db.{} drop tag newTag".format(tblName) + sTable.dropTag(dbc, "newTag") + # sql = "alter table db.{} drop tag newTag".format(tblName) else: # dice == 3 - sql = "alter table db.{} change tag extraTag newTag".format( - tblName) + sTable.changeTag(dbc, "extraTag", "newTag") + # sql = "alter table db.{} change tag extraTag newTag".format(tblName) - self.execWtSql(wt, sql) +class TaskRestartService(StateTransitionTask): + _isRunning = False + _classLock = threading.Lock() + @classmethod + def getEndState(cls): + return None # meaning doesn't affect state + + @classmethod + def canBeginFrom(cls, state: AnyState): + if gConfig.auto_start_service: + return state.canDropFixedSuperTable() # Basicallly when we have the super table + return False # don't run this otherwise + + CHANCE_TO_RESTART_SERVICE = 100 + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + if not gConfig.auto_start_service: # only execute when we are in -a mode + print("_a", end="", flush=True) + return + + with self._classLock: + if self._isRunning: + print("Skipping restart task, another running already") + return + self._isRunning = True + + if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance + dbc = wt.getDbConn() + dbc.execute("show databases") # simple delay, align timing with other workers + gSvcMgr.restart() + + self._isRunning = False class TaskAddData(StateTransitionTask): # Track which table is being actively worked on @@ -1833,39 +2009,31 @@ class TaskAddData(StateTransitionTask): return state.canAddData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbManager - # wt.execSql("use db") # TODO: seems to be an INSERT bug to require - # this - tblSeq = list( - range( + ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access + tblSeq = list(range( self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) for i in tblSeq: if (i in self.activeTable): # wow already active - # logger.info("Concurrent data insertion into table: {}".format(i)) - # print("ct({})".format(i), end="", flush=True) # Concurrent - # insertion into table - print("x", end="", flush=True) + print("x", end="", flush=True) # concurrent insertion else: self.activeTable.add(i) # marking it active - # No need to shuffle data sequence, unless later we decide to do - # non-increment insertion - regTableName = self.getRegTableName( - i) # "db.reg_table_{}".format(i) - for j in range( - self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table + + sTable = ds.getFixedSuperTable() + regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) + sTable.ensureTable(wt.getDbConn(), regTableName) # Ensure the table exists + + for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table nextInt = ds.getNextInt() if gConfig.record_ops: self.prepToRecordOps() - self.fAddLogReady.write( - "Ready to write {} to {}\n".format( - nextInt, regTableName)) + self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.flush() os.fsync(self.fAddLogReady) - sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format( + sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {}) regTableName, - ds.getFixedSuperTableName(), - ds.getNextBinary(), ds.getNextFloat(), + # ds.getFixedSuperTableName(), + # ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), nextInt) self.execWtSql(wt, sql) # Successfully wrote the data into the DB, let's record it @@ -1912,6 +2080,10 @@ class Dice(): raise RuntimeError("Cannot throw dice before seeding it") return random.randrange(start, stop) + @classmethod + def choice(cls, cList): + return random.choice(cList) + class LoggingFilter(logging.Filter): def filter(self, record: logging.LogRecord): @@ -1934,14 +2106,16 @@ class MyLoggingAdapter(logging.LoggerAdapter): class SvcManager: def __init__(self): print("Starting TDengine Service Manager") - signal.signal(signal.SIGTERM, self.sigIntHandler) - signal.signal(signal.SIGINT, self.sigIntHandler) - signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! + # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec + # signal.signal(signal.SIGINT, self.sigIntHandler) + # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! self.inSigHandler = False # self._status = MainExec.STATUS_RUNNING # set inside # _startTaosService() self.svcMgrThread = None + self._lock = threading.Lock() + self._isRestarting = False def _doMenu(self): choice = "" @@ -1976,23 +2150,22 @@ class SvcManager: self.sigHandlerResume() elif choice == "2": self.stopTaosService() - elif choice == "3": - self.stopTaosService() - self.startTaosService() + elif choice == "3": # Restart + self.restart() else: raise RuntimeError("Invalid menu choice: {}".format(choice)) self.inSigHandler = False def sigIntHandler(self, signalNumber, frame): - print("Sig INT Handler starting...") + print("SvcManager: INT Signal Handler starting...") if self.inSigHandler: print("Ignoring repeated SIG_INT...") return self.inSigHandler = True self.stopTaosService() - print("INT signal handler returning...") + print("SvcManager: INT Signal Handler returning...") self.inSigHandler = False def sigHandlerResume(self): @@ -2005,44 +2178,78 @@ class SvcManager: self.svcMgrThread = None # no more def _procIpcAll(self): - while self.svcMgrThread: # for as long as the svc mgr thread is still here - self.svcMgrThread.procIpcBatch() # regular processing, + while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here + if self.isRunning(): + self.svcMgrThread.procIpcBatch() # regular processing, + self._checkServiceManagerThread() + elif self.isRetarting(): + print("Service restarting...") time.sleep(0.5) # pause, before next round - self._checkServiceManagerThread() print( "Service Manager Thread (with subprocess) has ended, main thread now exiting...") def startTaosService(self): - if self.svcMgrThread: - raise RuntimeError( - "Cannot start TAOS service when one may already be running") - self.svcMgrThread = ServiceManagerThread() # create the object - self.svcMgrThread.start() - print("TAOS service started, printing out output...") - self.svcMgrThread.procIpcBatch( - trimToTarget=10, - forceOutput=True) # for printing 10 lines - print("TAOS service started") + with self._lock: + if self.svcMgrThread: + raise RuntimeError("Cannot start TAOS service when one may already be running") + + # Find if there's already a taosd service, and then kill it + for proc in psutil.process_iter(): + if proc.name() == 'taosd': + print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe") + time.sleep(2.0) + proc.kill() + # print("Process: {}".format(proc.name())) + + self.svcMgrThread = ServiceManagerThread() # create the object + self.svcMgrThread.start() + print("Attempting to start TAOS service started, printing out output...") + self.svcMgrThread.procIpcBatch( + trimToTarget=10, + forceOutput=True) # for printing 10 lines + print("TAOS service started") def stopTaosService(self, outputLines=20): - print("Terminating Service Manager Thread (SMT) execution...") - if not self.svcMgrThread: - raise RuntimeError("Unexpected empty svc mgr thread") - self.svcMgrThread.stop() - if self.svcMgrThread.isStopped(): - self.svcMgrThread.procIpcBatch(outputLines) # one last time - self.svcMgrThread = None - print("----- End of TDengine Service Output -----\n") - print("SMT execution terminated") - else: - print("WARNING: SMT did not terminate as expected") + with self._lock: + if not self.isRunning(): + logger.warning("Cannot stop TAOS service, not running") + return + + print("Terminating Service Manager Thread (SMT) execution...") + self.svcMgrThread.stop() + if self.svcMgrThread.isStopped(): + self.svcMgrThread.procIpcBatch(outputLines) # one last time + self.svcMgrThread = None + print("----- End of TDengine Service Output -----\n") + print("SMT execution terminated") + else: + print("WARNING: SMT did not terminate as expected") def run(self): self.startTaosService() - self._procIpcAll() # pump/process all the messages - if self.svcMgrThread: # if sig handler hasn't destroyed it by now + self._procIpcAll() # pump/process all the messages, may encounter SIG + restart + if self.isRunning(): # if sig handler hasn't destroyed it by now self.stopTaosService() # should have started already + def restart(self): + if self._isRestarting: + logger.warning("Cannot restart service when it's already restarting") + return + + self._isRestarting = True + if self.isRunning(): + self.stopTaosService() + else: + logger.warning("Service not running when restart requested") + + self.startTaosService() + self._isRestarting = False + + def isRunning(self): + return self.svcMgrThread != None + + def isRestarting(self): + return self._isRestarting class ServiceManagerThread: MAX_QUEUE_SIZE = 10000 @@ -2094,6 +2301,7 @@ class ServiceManagerThread: logger.info("[] TDengine service READY to process requests") return # now we've started # TODO: handle this better? + self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output raise RuntimeError("TDengine service did not start successfully") def stop(self): @@ -2196,7 +2404,10 @@ class ServiceManagerThread: if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started if line.find(self.TD_READY_MSG) != -1: # found - self._status = MainExec.STATUS_RUNNING + logger.info("Waiting for the service to become FULLY READY") + time.sleep(1.0) # wait for the server to truly start. TODO: remove this + logger.info("Service is now FULLY READY") + self._status = MainExec.STATUS_RUNNING # Trim the queue if necessary: TODO: try this 1 out of 10 times self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size @@ -2242,6 +2453,21 @@ class TdeSubProcess: taosdPath = self.getBuildPath() + "/build/bin/taosd" cfgPath = self.getBuildPath() + "/test/cfg" + # Delete the log files + logPath = self.getBuildPath() + "/test/log" + # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397 + # filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ] + # for f in filelist: + # filePath = os.path.join(logPath, f) + # print("Removing log file: {}".format(filePath)) + # os.remove(filePath) + if os.path.exists(logPath): + logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S') + logger.info("Saving old log files to: {}".format(logPathSaved)) + os.rename(logPath, logPathSaved) + # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms + + svcCmd = [taosdPath, '-c', cfgPath] # svcCmd = ['vmstat', '1'] if self.subProcess: # already there @@ -2275,16 +2501,46 @@ class TdeSubProcess: print("TDengine service process terminated successfully from SIG_INT") self.subProcess = None +class ThreadStacks: # stack info for all threads + def __init__(self): + self._allStacks = {} + allFrames = sys._current_frames() + for th in threading.enumerate(): + stack = traceback.extract_stack(allFrames[th.ident]) + self._allStacks[th.native_id] = stack + + def print(self, filteredEndName = None, filterInternal = False): + for thNid, stack in self._allStacks.items(): # for each thread + lastFrame = stack[-1] + if filteredEndName: # we need to filter out stacks that match this name + if lastFrame.name == filteredEndName : # end did not match + continue + if filterInternal: + if lastFrame.name in ['wait', 'invoke_excepthook', + '_wait', # The Barrier exception + 'svcOutputReader', # the svcMgr thread + '__init__']: # the thread that extracted the stack + continue # ignore + # Now print + print("\n<----- Thread Info for ID: {}".format(thNid)) + for frame in stack: + # print(frame) + print("File {filename}, line {lineno}, in {name}".format( + filename=frame.filename, lineno=frame.lineno, name=frame.name)) + print(" {}".format(frame.line)) + print("-----> End of Thread Info\n") class ClientManager: def __init__(self): print("Starting service manager") - signal.signal(signal.SIGTERM, self.sigIntHandler) - signal.signal(signal.SIGINT, self.sigIntHandler) + # signal.signal(signal.SIGTERM, self.sigIntHandler) + # signal.signal(signal.SIGINT, self.sigIntHandler) self._status = MainExec.STATUS_RUNNING self.tc = None + self.inSigHandler = False + def sigIntHandler(self, signalNumber, frame): if self._status != MainExec.STATUS_RUNNING: print("Repeated SIGINT received, forced exit...") @@ -2292,9 +2548,50 @@ class ClientManager: sys.exit(-1) self._status = MainExec.STATUS_STOPPING # immediately set our status - print("Terminating program...") + print("ClientManager: Terminating program...") self.tc.requestToStop() + def _doMenu(self): + choice = "" + while True: + print("\nInterrupting Client Program, Choose an Action: ") + print("1: Resume") + print("2: Terminate") + print("3: Show Threads") + # Remember to update the if range below + # print("Enter Choice: ", end="", flush=True) + while choice == "": + choice = input("Enter Choice: ") + if choice != "": + break # done with reading repeated input + if choice in ["1", "2", "3"]: + break # we are done with whole method + print("Invalid choice, please try again.") + choice = "" # reset + return choice + + def sigUsrHandler(self, signalNumber, frame): + print("Interrupting main thread execution upon SIGUSR1") + if self.inSigHandler: # already + print("Ignoring repeated SIG_USR1...") + return # do nothing if it's already not running + self.inSigHandler = True + + choice = self._doMenu() + if choice == "1": + print("Resuming execution...") + time.sleep(1.0) + elif choice == "2": + print("Not implemented yet") + time.sleep(1.0) + elif choice == "3": + ts = ThreadStacks() + ts.print() + else: + raise RuntimeError("Invalid menu choice: {}".format(choice)) + + self.inSigHandler = False + def _printLastNumbers(self): # to verify data durability dbManager = DbManager(resetDb=False) dbc = dbManager.getDbConn() @@ -2327,21 +2624,17 @@ class ClientManager: def prepare(self): self._printLastNumbers() - def run(self): - if gConfig.auto_start_service: - svcMgr = SvcManager() - svcMgr.startTaosService() - + def run(self, svcMgr): self._printLastNumbers() dbManager = DbManager() # Regular function thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) self.tc = ThreadCoordinator(thPool, dbManager) - + self.tc.run() # print("exec stats: {}".format(self.tc.getExecStats())) # print("TC failed = {}".format(self.tc.isFailed())) - if gConfig.auto_start_service: + if svcMgr: # gConfig.auto_start_service: svcMgr.stopTaosService() # Print exec status, etc., AFTER showing messages from the server self.conclude() @@ -2353,25 +2646,58 @@ class ClientManager: self.tc.printStats() self.tc.getDbManager().cleanUp() - class MainExec: STATUS_STARTING = 1 STATUS_RUNNING = 2 STATUS_STOPPING = 3 STATUS_STOPPED = 4 - @classmethod - def runClient(cls): - clientManager = ClientManager() - return clientManager.run() + def __init__(self): + self._clientMgr = None + self._svcMgr = None - @classmethod - def runService(cls): - svcManager = SvcManager() - svcManager.run() + signal.signal(signal.SIGTERM, self.sigIntHandler) + signal.signal(signal.SIGINT, self.sigIntHandler) + signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! - @classmethod - def runTemp(cls): # for debugging purposes + def sigUsrHandler(self, signalNumber, frame): + if self._clientMgr: + self._clientMgr.sigUsrHandler(signalNumber, frame) + elif self._svcMgr: # Only if no client mgr, we are running alone + self._svcMgr.sigUsrHandler(signalNumber, frame) + + def sigIntHandler(self, signalNumber, frame): + if self._svcMgr: + self._svcMgr.sigIntHandler(signalNumber, frame) + if self._clientMgr: + self._clientMgr.sigIntHandler(signalNumber, frame) + + def runClient(self): + global gSvcMgr + if gConfig.auto_start_service: + self._svcMgr = SvcManager() + gSvcMgr = self._svcMgr # hack alert + self._svcMgr.startTaosService() # we start, don't run + + self._clientMgr = ClientManager() + ret = None + try: + ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside + except requests.exceptions.ConnectionError as err: + logger.warning("Failed to open REST connection to DB") + # don't raise + return ret + + def runService(self): + global gSvcMgr + self._svcMgr = SvcManager() + gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert + + self._svcMgr.run() # run to some end state + self._svcMgr = None + gSvcMgr = None + + def runTemp(self): # for debugging purposes # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix # dbc = dbState.getDbConn() # sTbName = dbState.getFixedSuperTableName() @@ -2527,10 +2853,11 @@ def main(): Dice.seed(0) # initial seeding of dice # Run server or client + mExec = MainExec() if gConfig.run_tdengine: # run server - MainExec.runService() + mExec.runService() else: - return MainExec.runClient() + return mExec.runClient() if __name__ == "__main__":