From 91c1fe45b6a6bb0f5118e0cc306be0d89ebe4d6f Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 24 Jul 2020 03:15:09 +0000 Subject: [PATCH] Added service restart to crash_gen tool, discovered meter id mismatch problem --- tests/pytest/crash_gen.py | 309 +++++++++++++++++++++++++++----------- 1 file changed, 219 insertions(+), 90 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index f43e746a71..97d7f2c79e 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -59,13 +59,12 @@ if sys.version_info[0] < 3: # Command-line/Environment Configurations, will set a bit later # ConfigNameSpace = argparse.Namespace gConfig = argparse.Namespace() # Dummy value, will be replaced later +gSvcMgr = None # TODO: refactor this hack, use dep injection logger = None - def runThread(wt: WorkerThread): wt.run() - class CrashGenError(Exception): def __init__(self, msg=None, errno=None): self.msg = msg @@ -206,22 +205,13 @@ class WorkerThread: time.sleep(0) # let the released thread run a bit def execSql(self, sql): # TODO: expose DbConn directly - if (gConfig.per_thread_db_connection): - return self._dbConn.execute(sql) - else: - return self._tc.getDbManager().getDbConn().execute(sql) + return self.getDbConn().execute(sql) def querySql(self, sql): # TODO: expose DbConn directly - if (gConfig.per_thread_db_connection): - return self._dbConn.query(sql) - else: - return self._tc.getDbManager().getDbConn().query(sql) + return self.getDbConn().query(sql) def getQueryResult(self): - if (gConfig.per_thread_db_connection): - return self._dbConn.getQueryResult() - else: - return self._tc.getDbManager().getDbConn().getQueryResult() + return self.getDbConn().getQueryResult() def getDbConn(self): if (gConfig.per_thread_db_connection): @@ -239,6 +229,8 @@ class WorkerThread: class ThreadCoordinator: + WORKER_THREAD_TIMEOUT = 30 + def __init__(self, pool: ThreadPool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool @@ -309,7 +301,7 @@ class ThreadCoordinator: # let other threads go past the pool barrier, but wait at the # thread gate logger.debug("[TRD] Main thread about to cross the barrier") - self.crossStepBarrier(timeout=15) + self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT) self._stepBarrier.reset() # Other worker threads should now be at the "gate" logger.debug("[TRD] Main thread finished crossing the barrier") @@ -586,6 +578,10 @@ class DbConn: def __init__(self): self.isOpen = False self._type = self.TYPE_INVALID + self._lastSql = None + + def getLastSql(self): + return self._lastSql def open(self): if (self.isOpen): @@ -674,10 +670,11 @@ class DbConnRest(DbConn): self.isOpen = False def _doSql(self, sql): + self._lastSql = sql # remember this, last SQL attempted try: r = requests.post(self._url, data = sql, - auth = HTTPBasicAuth('root', 'taosdata')) + auth = HTTPBasicAuth('root', 'taosdata')) except: print("REST API Failure (TODO: more info here)") raise @@ -1116,7 +1113,7 @@ class StateMechine: self._curState = self._findCurrentState() # starting state # transitition target probabilities, indexed with value of STATE_EMPTY, # STATE_DB_ONLY, etc. - self._stateWeights = [1, 3, 5, 15] + self._stateWeights = [1, 2, 10, 40] def getCurrentState(self): return self._curState @@ -1332,8 +1329,8 @@ class DbManager(): def getNextTick(self): with self._lock: # prevent duplicate tick - if Dice.throw(10) == 0: # 1 in 10 chance - return self._lastTick + datetime.timedelta(0, -100) + if Dice.throw(20) == 0: # 1 in 20 chance + return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds else: # regular # add one second to it self._lastTick += datetime.timedelta(0, 1) @@ -1448,7 +1445,6 @@ class Task(): # logger.debug("Creating new task {}...".format(self._taskNum)) self._execStats = execStats - self._lastSql = "" # last SQL executed/attempted def isSuccess(self): return self._err is None @@ -1492,6 +1488,8 @@ class Task(): 1000 # REST catch-all error ]: return True # These are the ALWAYS-ACCEPTABLE ones + elif (errno in [ 0x0B ]) and gConfig.auto_start_service: + return True # We may get "network unavilable" when restarting service elif errno == 0x200 : # invalid SQL, we need to div in a bit more if msg.find("invalid column name") != -1: return True @@ -1513,8 +1511,8 @@ class Task(): "[-] executing task {}...".format(self.__class__.__name__)) self._err = None - self._execStats.beginTaskType( - self.__class__.__name__) # mark beginning + self._execStats.beginTaskType(self.__class__.__name__) # mark beginning + errno2 = None try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: @@ -1522,16 +1520,17 @@ class Task(): err.errno > 0) else 0x80000000 + err.errno # correct error scheme if (gConfig.continue_on_exception): # user choose to continue self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( - errno2, err, self._lastSql)) + errno2, err, wt.getDbConn().getLastSql())) self._err = err elif self._isErrAcceptable(errno2, err.__str__()): self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( - errno2, err, self._lastSql)) + errno2, err, wt.getDbConn().getLastSql())) print("_", end="", flush=True) self._err = err else: # not an acceptable error - errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( - errno2, err, self._lastSql) + errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( + self.__class__.__name__, + errno2, err, wt.getDbConn().getLastSql()) self.logDebug(errMsg) if gConfig.debug: # raise # so that we see full stack @@ -1555,25 +1554,22 @@ class Task(): except BaseException: self.logDebug( "[=] Unexpected exception, SQL: {}".format( - self._lastSql)) + wt.getDbConn().getLastSql())) raise self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) self.logDebug("[X] task execution completed, {}, status: {}".format( self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) # TODO: merge with above. - self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) + self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2) def execSql(self, sql): - self._lastSql = sql return self._dbManager.execute(sql) def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread - self._lastSql = sql return wt.execSql(sql) def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread - self._lastSql = sql return wt.querySql(sql) def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread @@ -1588,6 +1584,7 @@ class ExecutionStats: self._lock = threading.Lock() self._firstTaskStartTime = None self._execStartTime = None + self._errors = {} self._elapsedTime = 0.0 # total elapsed time self._accRunTime = 0.0 # accumulated run time @@ -1607,13 +1604,18 @@ class ExecutionStats: def endExec(self): self._elapsedTime = time.time() - self._execStartTime - def incExecCount(self, klassName, isSuccess): # TODO: add a lock here + def incExecCount(self, klassName, isSuccess, eno=None): # TODO: add a lock here if klassName not in self._execTimes: self._execTimes[klassName] = [0, 0] t = self._execTimes[klassName] # tuple for the data t[0] += 1 # index 0 has the "total" execution times if isSuccess: t[1] += 1 # index 1 has the "success" execution times + if eno != None: + if klassName not in self._errors: + self._errors[klassName] = {} + errors = self._errors[klassName] + errors[eno] = errors[eno]+1 if eno in errors else 1 def beginTaskType(self, klassName): with self._lock: @@ -1643,7 +1645,14 @@ class ExecutionStats: execTimesAny = 0 for k, n in self._execTimes.items(): execTimesAny += n[0] - logger.info("| {0:<24}: {1}/{2}".format(k, n[1], n[0])) + errStr = None + if k in self._errors: + errors = self._errors[k] + # print("errors = {}".format(errors)) + errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()] + # print("error strings = {}".format(errStrs)) + errStr = ", ".join(errStrs) + logger.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr)) logger.info( "| Total Tasks Executed (success or not): {} ".format(execTimesAny)) @@ -1742,11 +1751,10 @@ class TaskCreateSuperTable(StateTransitionTask): logger.debug("Skipping task, no DB yet") return - tblName = self._dbManager.getFixedSuperTableName() + sTable = self._dbManager.getFixedSuperTable() # wt.execSql("use db") # should always be in place - self.execWtSql( - wt, - "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) + sTable.create(wt.getDbConn(), {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}) + # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that # automatically @@ -1755,6 +1763,14 @@ class TdSuperTable: def __init__(self, stName): self._stName = stName + def create(self, dbc, cols: dict, tags: dict): + sql = "CREATE TABLE db.{} ({}) TAGS ({})".format( + self._stName, + ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]), + ",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) + ) + dbc.execute(sql) + def getRegTables(self, dbc: DbConn): try: dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later @@ -1773,12 +1789,56 @@ class TdSuperTable: sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName) if dbc.query(sql) >= 1 : # reg table exists already return - sql = "CREATE TABLE {} USING {} tags ('{}', {})".format( - regTableName, self._stName, - 'xyz', '33' + sql = "CREATE TABLE {} USING {} tags ({})".format( + regTableName, self._stName, self._getTagStrForSql(dbc) ) dbc.execute(sql) + def _getTagStrForSql(self, dbc) : + tags = self._getTags(dbc) + tagStrs = [] + for tagName in tags: + tagType = tags[tagName] + if tagType == 'BINARY': + tagStrs.append("'Beijing-Shanghai-LosAngeles'") + elif tagType == 'FLOAT': + tagStrs.append('9.9') + elif tagType == 'INT': + tagStrs.append('88') + else: + raise RuntimeError("Unexpected tag type: {}".format(tagType)) + return ", ".join(tagStrs) + + def _getTags(self, dbc) -> dict: + dbc.query("DESCRIBE {}".format(self._stName)) + stCols = dbc.getQueryResult() + # print(stCols) + ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type + # print("Tags retrieved: {}".format(ret)) + return ret + + def addTag(self, dbc, tagName, tagType): + if tagName in self._getTags(dbc): # already + return + # sTable.addTag("extraTag", "int") + sql = "alter table db.{} add tag {} {}".format(self._stName, tagName, tagType) + dbc.execute(sql) + + def dropTag(self, dbc, tagName): + if not tagName in self._getTags(dbc): # don't have this tag + return + sql = "alter table db.{} drop tag {}".format(self._stName, tagName) + dbc.execute(sql) + + def changeTag(self, dbc, oldTag, newTag): + tags = self._getTags(dbc) + if not oldTag in tags: # don't have this tag + return + if newTag in tags: # already have this tag + return + sql = "alter table db.{} change tag {} {}".format(self._stName, oldTag, newTag) + dbc.execute(sql) + class TaskReadData(StateTransitionTask): @classmethod def getEndState(cls): @@ -1796,7 +1856,7 @@ class TaskReadData(StateTransitionTask): wt.getDbConn().close() wt.getDbConn().open() - for rTbName in sTable.getRegTables(self._dbManager.getDbConn()): # regular tables + for rTbName in sTable.getRegTables(wt.getDbConn()): # regular tables aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)', # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable 'sum(speed)', 'stddev(speed)', @@ -1805,7 +1865,7 @@ class TaskReadData(StateTransitionTask): self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName)) except taos.error.ProgrammingError as err: errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno - logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) + logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, wt.getDbConn().getLastSql())) raise class TaskDropSuperTable(StateTransitionTask): @@ -1864,20 +1924,54 @@ class TaskAlterTags(StateTransitionTask): return state.canDropFixedSuperTable() # if we can drop it, we can alter tags def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbManager.getFixedSuperTableName() + # tblName = self._dbManager.getFixedSuperTableName() + dbc = wt.getDbConn() + sTable = self._dbManager.getFixedSuperTable() dice = Dice.throw(4) if dice == 0: - sql = "alter table db.{} add tag extraTag int".format(tblName) + sTable.addTag(dbc, "extraTag", "int") + # sql = "alter table db.{} add tag extraTag int".format(tblName) elif dice == 1: - sql = "alter table db.{} drop tag extraTag".format(tblName) + sTable.dropTag(dbc, "extraTag") + # sql = "alter table db.{} drop tag extraTag".format(tblName) elif dice == 2: - sql = "alter table db.{} drop tag newTag".format(tblName) + sTable.dropTag(dbc, "newTag") + # sql = "alter table db.{} drop tag newTag".format(tblName) else: # dice == 3 - sql = "alter table db.{} change tag extraTag newTag".format( - tblName) + sTable.changeTag(dbc, "extraTag", "newTag") + # sql = "alter table db.{} change tag extraTag newTag".format(tblName) + +class TaskRestartService(StateTransitionTask): + _isRunning = False + _classLock = threading.Lock() + + @classmethod + def getEndState(cls): + return None # meaning doesn't affect state + + @classmethod + def canBeginFrom(cls, state: AnyState): + if gConfig.auto_start_service: + return state.canDropFixedSuperTable() # Basicallly when we have the super table + return False # don't run this otherwise + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + if not gConfig.auto_start_service: # only execute when we are in -a mode + print("_a", end="", flush=True) + return - self.execWtSql(wt, sql) + with self._classLock: + if self._isRunning: + print("Skipping restart task, another running already") + return + self._isRunning = True + + if Dice.throw(50) == 0: # 1 in N chance + dbc = wt.getDbConn() + dbc.execute("show databases") # simple delay, align timing with other workers + gSvcMgr.restart() + self._isRunning = False class TaskAddData(StateTransitionTask): # Track which table is being actively worked on @@ -1908,7 +2002,7 @@ class TaskAddData(StateTransitionTask): return state.canAddData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbManager + ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access tblSeq = list(range( self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) @@ -1920,7 +2014,7 @@ class TaskAddData(StateTransitionTask): sTable = ds.getFixedSuperTable() regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) - sTable.ensureTable(ds.getDbConn(), regTableName) # Ensure the table exists + sTable.ensureTable(wt.getDbConn(), regTableName) # Ensure the table exists for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table nextInt = ds.getNextInt() @@ -2013,6 +2107,8 @@ class SvcManager: # self._status = MainExec.STATUS_RUNNING # set inside # _startTaosService() self.svcMgrThread = None + self._lock = threading.Lock() + self._isRestarting = False def _doMenu(self): choice = "" @@ -2047,9 +2143,8 @@ class SvcManager: self.sigHandlerResume() elif choice == "2": self.stopTaosService() - elif choice == "3": - self.stopTaosService() - self.startTaosService() + elif choice == "3": # Restart + self.restart() else: raise RuntimeError("Invalid menu choice: {}".format(choice)) @@ -2076,57 +2171,79 @@ class SvcManager: self.svcMgrThread = None # no more def _procIpcAll(self): - while self.svcMgrThread: # for as long as the svc mgr thread is still here - self.svcMgrThread.procIpcBatch() # regular processing, + while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here + if self.isRunning(): + self.svcMgrThread.procIpcBatch() # regular processing, + self._checkServiceManagerThread() + elif self.isRetarting(): + print("Service restarting...") time.sleep(0.5) # pause, before next round - self._checkServiceManagerThread() print( "Service Manager Thread (with subprocess) has ended, main thread now exiting...") def startTaosService(self): - if self.svcMgrThread: - raise RuntimeError("Cannot start TAOS service when one may already be running") - - # Find if there's already a taosd service, and then kill it - for proc in psutil.process_iter(): - if proc.name() == 'taosd': - print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe") - time.sleep(2.0) - proc.kill() - # print("Process: {}".format(proc.name())) - - self.svcMgrThread = ServiceManagerThread() # create the object - self.svcMgrThread.start() - print("TAOS service started, printing out output...") - self.svcMgrThread.procIpcBatch( - trimToTarget=10, - forceOutput=True) # for printing 10 lines - print("TAOS service started") + with self._lock: + if self.svcMgrThread: + raise RuntimeError("Cannot start TAOS service when one may already be running") + + # Find if there's already a taosd service, and then kill it + for proc in psutil.process_iter(): + if proc.name() == 'taosd': + print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe") + time.sleep(2.0) + proc.kill() + # print("Process: {}".format(proc.name())) + + self.svcMgrThread = ServiceManagerThread() # create the object + self.svcMgrThread.start() + print("TAOS service started, printing out output...") + self.svcMgrThread.procIpcBatch( + trimToTarget=10, + forceOutput=True) # for printing 10 lines + print("TAOS service started") def stopTaosService(self, outputLines=20): - if not self.isRunning(): - logger.warning("Cannot stop TAOS service, not running") - return + with self._lock: + if not self.isRunning(): + logger.warning("Cannot stop TAOS service, not running") + return - print("Terminating Service Manager Thread (SMT) execution...") - self.svcMgrThread.stop() - if self.svcMgrThread.isStopped(): - self.svcMgrThread.procIpcBatch(outputLines) # one last time - self.svcMgrThread = None - print("----- End of TDengine Service Output -----\n") - print("SMT execution terminated") - else: - print("WARNING: SMT did not terminate as expected") + print("Terminating Service Manager Thread (SMT) execution...") + self.svcMgrThread.stop() + if self.svcMgrThread.isStopped(): + self.svcMgrThread.procIpcBatch(outputLines) # one last time + self.svcMgrThread = None + print("----- End of TDengine Service Output -----\n") + print("SMT execution terminated") + else: + print("WARNING: SMT did not terminate as expected") def run(self): self.startTaosService() - self._procIpcAll() # pump/process all the messages + self._procIpcAll() # pump/process all the messages, may encounter SIG + restart if self.isRunning(): # if sig handler hasn't destroyed it by now self.stopTaosService() # should have started already + def restart(self): + if self._isRestarting: + logger.warning("Cannot restart service when it's already restarting") + return + + self._isRestarting = True + if self.isRunning(): + self.stopTaosService() + else: + logger.warning("Service not running when restart requested") + + self.startTaosService() + self._isRestarting = False + def isRunning(self): return self.svcMgrThread != None + def isRestarting(self): + return self._isRestarting + class ServiceManagerThread: MAX_QUEUE_SIZE = 10000 @@ -2513,7 +2630,6 @@ class ClientManager: self.tc.printStats() self.tc.getDbManager().cleanUp() - class MainExec: STATUS_STARTING = 1 STATUS_RUNNING = 2 @@ -2541,16 +2657,29 @@ class MainExec: self._clientMgr.sigIntHandler(signalNumber, frame) def runClient(self): + global gSvcMgr if gConfig.auto_start_service: self._svcMgr = SvcManager() + gSvcMgr = self._svcMgr # hack alert self._svcMgr.startTaosService() # we start, don't run self._clientMgr = ClientManager() - ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside + ret = None + try: + ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside + except requests.exceptions.ConnectionError as err: + logger.warning("Failed to open REST connection to DB") + # don't raise + return ret def runService(self): + global gSvcMgr self._svcMgr = SvcManager() + gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert + self._svcMgr.run() # run to some end state + self._svcMgr = None + gSvcMgr = None def runTemp(self): # for debugging purposes # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix -- GitLab