diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 49c3c15a42a768216946ff4662022b16bde0b48b..8a0a043fb0290859e40d63ea151b46d60613334d 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -74,10 +74,10 @@ class WorkerThread: self._dbConn = DbConn() def logDebug(self, msg): - logger.info(" t[{}] {}".format(self._tid, msg)) + logger.info(" TRD[{}] {}".format(self._tid, msg)) def logInfo(self, msg): - logger.info(" t[{}] {}".format(self._tid, msg)) + logger.info(" TRD[{}] {}".format(self._tid, msg)) def getTaskExecutor(self): @@ -106,16 +106,19 @@ class WorkerThread: while True: tc = self._tc # Thread Coordinator, the overall master tc.crossStepBarrier() # shared barrier first, INCLUDING the last one - # logger.debug("Thread task loop exited barrier...") + logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped - # logger.debug("Thread task loop exited step gate...") + logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) if not self._tc.isRunning(): - logger.debug("Thread Coordinator not running any more, worker thread now stopping...") + logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() + logger.debug("[TRD] Worker thread [{}] about to execute task".format(self._tid)) task.execute(self) tc.saveExecutedTask(task) + logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): @@ -135,7 +138,7 @@ class WorkerThread: self.verifyThreadSelf() # only allowed by ourselves # Wait again at the "gate", waiting to be "tapped" - # logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) + logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid)) self._stepGate.wait() self._stepGate.clear() @@ -145,7 +148,7 @@ class WorkerThread: self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - # logger.debug("Tapping worker thread {}".format(self._tid)) + logger.debug("[TRD] Tapping worker thread {}".format(self._tid)) self._stepGate.set() # wake up! time.sleep(0) # let the released thread run a bit @@ -192,8 +195,9 @@ class ThreadCoordinator: self._execStats.startExec() # start the stop watch failed = False while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 - print(".", end="", flush=True) - logger.debug("Main thread going to sleep") + if not gConfig.debug: + print(".", end="", flush=True) # print this only if we are not in debug mode + logger.debug("[TRD] Main thread going to sleep") # Now ready to enter a step self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate @@ -226,7 +230,7 @@ class ThreadCoordinator: if not failed: # only if not failed self._te = TaskExecutor(self._curStep) - logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep + logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep self.tapAllThreads() logger.debug("Main thread ready to finish up...") @@ -253,7 +257,7 @@ class ThreadCoordinator: wakeSeq.append(i) else: wakeSeq.insert(0, i) - logger.info("Waking up threads: {}".format(str(wakeSeq))) + logger.info("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq))) # TODO: set dice seed to a deterministic value for i in wakeSeq: self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! @@ -473,7 +477,7 @@ class AnyState: self._info = self.getInfo() def __str__(self): - return self._stateNames[self._info[self.STATE_VAL_IDX] - 1] # -1 hack to accomodate the STATE_INVALID case + return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case def getInfo(self): raise RuntimeError("Must be overriden by child classes") @@ -481,6 +485,9 @@ class AnyState: def verifyTasksToState(self, tasks, newState): raise RuntimeError("Must be overriden by child classes") + def getValIndex(self): + return self._info[self.STATE_VAL_IDX] + def getValue(self): return self._info[self.STATE_VAL_IDX] def canCreateDb(self): @@ -502,7 +509,7 @@ class AnyState: if not isinstance(task, cls): continue if task.isSuccess(): - task.logDebug("Task success found") + # task.logDebug("Task success found") sCnt += 1 if ( sCnt >= 2 ): raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) @@ -669,7 +676,7 @@ class DbState(): else: raise except: - print("[=]Unexpected exception") + print("[=] Unexpected exception") raise self._dbConn.resetDb() # drop and recreate DB self._state = StateEmpty() # initial state, the result of above @@ -711,15 +718,26 @@ class DbState(): def cleanUp(self): self._dbConn.close() - def getTaskTypesAtState(self): + # May be slow, use cautionsly... + def getTaskTypesAtState(self): allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks - taskTypes = [] + firstTaskTypes = [] for tc in allTaskClasses: - # t = tc(self) # create task object + # t = tc(self) # create task object if tc.canBeginFrom(self._state): - taskTypes.append(tc) + firstTaskTypes.append(tc) + # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones + taskTypes = firstTaskTypes.copy() # have to have these + for task1 in firstTaskTypes: # each task type gathered so far + endState = task1.getEndState() # figure the end state + if endState == None: + continue + for tc in allTaskClasses: # what task can further begin from there? + if tc.canBeginFrom(endState) and (endState not in firstTaskTypes): + taskTypes.append(tc) # gather it + if len(taskTypes) <= 0: - raise RuntimeError("No suitable task types found for state: {}".format(self._state)) + raise RuntimeError("No suitable task types found for state: {}".format(self._state)) return taskTypes # tasks.append(ReadFixedDataTask(self)) # always for everybody @@ -746,7 +764,7 @@ class DbState(): for tt in taskTypes: endState = tt.getEndState() if endState != None : - weights.append(self._stateWeights[endState]) # TODO: change to a method + weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method else: weights.append(10) # read data task, default to 10: TODO: change to a constant i = self._weighted_choice_sub(weights) @@ -763,13 +781,17 @@ class DbState(): def _findCurrentState(self): dbc = self._dbConn if dbc.query("show databases") == 0 : # no database?! + # logger.debug("Found EMPTY state") return StateEmpty() dbc.execute("use db") # did not do this when openning connection if dbc.query("show tables") == 0 : # no tables + # logger.debug("Found DB ONLY state") return StateDbOnly() - if dbc.query("SELECT * FROM {}".format(self.getFixedTableName()) ) == 0 : # no data + if dbc.query("SELECT * FROM db.{}".format(self.getFixedTableName()) ) == 0 : # no data + # logger.debug("Found TABLE_ONLY state") return StateTableOnly() else: + # logger.debug("Found HAS_DATA state") return StateHasData() def transition(self, tasks): @@ -828,10 +850,11 @@ class Task(): @classmethod def allocTaskNum(cls): - cls.taskSn += 1 - return cls.taskSn + Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy + # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) + return Task.taskSn - def __init__(self, dbState: DbState, execStats: ExecutionStats): + def __init__(self, dbState: DbState, execStats: ExecutionStats): self._dbState = dbState self._workerThread = None self._err = None @@ -840,6 +863,7 @@ class Task(): # Assign an incremental task serial number self._taskNum = self.allocTaskNum() + # logger.debug("Creating new task {}...".format(self._taskNum)) self._execStats = execStats @@ -851,10 +875,10 @@ class Task(): return newTask def logDebug(self, msg): - self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg)) def logInfo(self, msg): - self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg)) def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) @@ -872,10 +896,10 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - self.logDebug("[=]Taos Execution exception: {0}".format(err)) + self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err)) self._err = err except: - self.logDebug("[=]Unexpected exception") + self.logDebug("[=] Unexpected exception") raise self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) @@ -962,7 +986,7 @@ class StateTransitionTask(Task): # return cls.getInfo()[0] @classmethod - def getEndState(cls): + def getEndState(cls): # returning the class name return cls.getInfo()[0] @classmethod @@ -980,7 +1004,7 @@ class CreateDbTask(StateTransitionTask): def getInfo(cls): return [ # [AnyState.STATE_EMPTY], # can begin from - AnyState.STATE_DB_ONLY # end state + StateDbOnly() # end state ] @classmethod @@ -995,7 +1019,7 @@ class DropDbTask(StateTransitionTask): def getInfo(cls): return [ # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - AnyState.STATE_EMPTY + StateEmpty() ] @classmethod @@ -1010,7 +1034,7 @@ class CreateFixedTableTask(StateTransitionTask): def getInfo(cls): return [ # [AnyState.STATE_DB_ONLY], - AnyState.STATE_TABLE_ONLY + StateTableOnly() ] @classmethod @@ -1043,7 +1067,7 @@ class DropFixedTableTask(StateTransitionTask): def getInfo(cls): return [ # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - AnyState.STATE_DB_ONLY # meaning doesn't affect state + StateDbOnly() # meaning doesn't affect state ] @classmethod @@ -1059,7 +1083,7 @@ class AddFixedDataTask(StateTransitionTask): def getInfo(cls): return [ # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - AnyState.STATE_HAS_DATA + StateHasData() ] @classmethod @@ -1164,6 +1188,17 @@ class Dice(): # task = self.pickTask() # task.execute(workerThread) +class LoggingFilter(logging.Filter): + def filter(self, record: logging.LogRecord): + msg = record.msg + # print("type = {}, value={}".format(type(msg), msg)) + # sys.exit() + if msg.startswith("[TRD]"): + return False + return True + + + def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html parser = argparse.ArgumentParser( @@ -1192,6 +1227,7 @@ def main(): global logger logger = logging.getLogger('CrashGen') + logger.addFilter(LoggingFilter()) if ( gConfig.debug ): logger.setLevel(logging.DEBUG) # default seems to be INFO ch = logging.StreamHandler()