From 5b4e7a6d54881f14b5e218eb2620d013f9c78de5 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Sun, 24 May 2020 15:15:05 -0700 Subject: [PATCH] refactoring, added exec stats, re-orged task/state relationships --- tests/pytest/crash_gen.py | 325 ++++++++++++++++++++++++++++---------- 1 file changed, 244 insertions(+), 81 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 56f01a3594..c060a5dae7 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -24,11 +24,13 @@ import copy import threading import random +import time import logging import datetime import textwrap from typing import List +from typing import Dict from util.log import * from util.dnodes import * @@ -156,18 +158,19 @@ class WorkerThread: return self._dbConn.query(sql) else: return self._tc.getDbState().getDbConn().query(sql) - + class ThreadCoordinator: - def __init__(self, pool, wd: WorkDispatcher, dbState): + def __init__(self, pool, dbState): self._curStep = -1 # first step is 0 self._pool = pool - self._wd = wd + # self._wd = wd self._te = None # prepare for every new step self._dbState = dbState self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads + self._execStats = ExecutionStats() def getTaskExecutor(self): return self._te @@ -184,7 +187,8 @@ class ThreadCoordinator: # Coordinate all threads step by step self._curStep = -1 # not started yet maxSteps = gConfig.max_steps # type: ignore - while(self._curStep < maxSteps): + startTime = time.time() + while(self._curStep < maxSteps-1): # maxStep==10, last curStep should be 9 print(".", end="", flush=True) logger.debug("Main thread going to sleep") @@ -218,6 +222,8 @@ class ThreadCoordinator: self._pool.joinAll() # Get all threads to finish logger.info("All threads finished") + self._execStats.logStats() + logger.info("Total Execution Time (task busy time, plus Python overhead): {:.2f} seconds".format(time.time() - startTime)) print("\r\nFinished") def tapAllThreads(self): # in a deterministic manner @@ -241,11 +247,15 @@ class ThreadCoordinator: raise RuntimeError("Cannot fetch task when not running") # return self._wd.pickTask() # Alternatively, let's ask the DbState for the appropriate task - dbState = self.getDbState() - tasks = dbState.getTasksAtState() - i = Dice.throw(len(tasks)) - # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. - return tasks[i].clone() + # dbState = self.getDbState() + # tasks = dbState.getTasksAtState() # TODO: create every time? + # nTasks = len(tasks) + # i = Dice.throw(nTasks) + # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) + # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + # return tasks[i].clone() # TODO: still necessary? + taskType = self.getDbState().pickTaskType() # pick a task type for current state + return taskType(self.getDbState(), self._execStats) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -261,7 +271,7 @@ class ThreadPool: self.maxSteps = maxSteps self.funcSequencer = funcSequencer # Internal class variables - self.dispatcher = WorkDispatcher(dbState) + # self.dispatcher = WorkDispatcher(dbState) # Obsolete? self.curStep = 0 self.threadList = [] # self.stepGate = threading.Condition() # Gate to hold/sync all threads @@ -409,17 +419,19 @@ class DbConn: # State of the database as we believe it to be class DbState(): STATE_INVALID = -1 - STATE_EMPTY = 1 # nothing there, no even a DB - STATE_DB_ONLY = 2 # we have a DB, but nothing else - STATE_TABLE_ONLY = 3 # we have a table, but totally empty - STATE_HAS_DATA = 4 # we have some data in the table + STATE_EMPTY = 0 # nothing there, no even a DB + STATE_DB_ONLY = 1 # we have a DB, but nothing else + STATE_TABLE_ONLY = 2 # we have a table, but totally empty + STATE_HAS_DATA = 3 # we have some data in the table def __init__(self): self.tableNumQueue = LinearQueue() self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() + self._state = self.STATE_INVALID + self._stateWeights = [1,3,5,10] # self.openDbServerConnection() self._dbConn = DbConn() @@ -478,25 +490,56 @@ class DbState(): def cleanUp(self): self._dbConn.close() - def getTasksAtState(self): - tasks = [] - tasks.append(ReadFixedDataTask(self)) # always for everybody - if ( self._state == self.STATE_EMPTY ): - tasks.append(CreateDbTask(self)) - tasks.append(CreateFixedTableTask(self)) - elif ( self._state == self.STATE_DB_ONLY ): - tasks.append(DropDbTask(self)) - tasks.append(CreateFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) - elif ( self._state == self.STATE_TABLE_ONLY ): - tasks.append(DropFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) - elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust - tasks.append(DropFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) - else: - raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - return tasks + def getTaskTypesAtState(self): + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks + taskTypes = [] + for tc in allTaskClasses: + # t = tc(self) # create task object + if tc.canBeginFrom(self._state): + taskTypes.append(tc) + if len(taskTypes) <= 0: + raise RuntimeError("No suitable task types found for state: {}".format(self._state)) + return taskTypes + + # tasks.append(ReadFixedDataTask(self)) # always for everybody + # if ( self._state == self.STATE_EMPTY ): + # tasks.append(CreateDbTask(self)) + # tasks.append(CreateFixedTableTask(self)) + # elif ( self._state == self.STATE_DB_ONLY ): + # tasks.append(DropDbTask(self)) + # tasks.append(CreateFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # elif ( self._state == self.STATE_TABLE_ONLY ): + # tasks.append(DropFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust + # tasks.append(DropFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # else: + # raise RuntimeError("Unexpected DbState state: {}".format(self._state)) + # return tasks + + def pickTaskType(self): + taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state + weights = [] + for tt in taskTypes: + endState = tt.getEndState() + if endState != None : + weights.append(self._stateWeights[endState]) # TODO: change to a method + else: + weights.append(10) # read data task, default to 10: TODO: change to a constant + i = self._weighted_choice_sub(weights) + logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) + return taskTypes[i] + + def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ + rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? + for i, w in enumerate(weights): + rnd -= w + if rnd < 0: + return i + + def transition(self, tasks): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty @@ -646,7 +689,7 @@ class Task(): cls.taskSn += 1 return cls.taskSn - def __init__(self, dbState: DbState): + def __init__(self, dbState: DbState, execStats: ExecutionStats): self._dbState = dbState self._workerThread = None self._err = None @@ -656,11 +699,13 @@ class Task(): # Assign an incremental task serial number self._taskNum = self.allocTaskNum() + self._execStats = execStats + def isSuccess(self): return self._err == None - def clone(self): - newTask = self.__class__(self._dbState) + def clone(self): # TODO: why do we need this again? + newTask = self.__class__(self._dbState, self._execStats) return newTask def logDebug(self, msg): @@ -681,6 +726,7 @@ class Task(): self.logDebug("[-] executing task {}...".format(self.__class__.__name__)) self._err = None + self._execStats.beginTaskType(self.__class__.__name__) # mark beginning try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: @@ -689,39 +735,164 @@ class Task(): except: self.logDebug("[=]Unexpected exception") raise + self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) - self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) + self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) + self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): return self._dbState.execute(sql) -class CreateDbTask(Task): + +class ExecutionStats: + def __init__(self): + self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task + self._tasksInProgress = 0 + self._lock = threading.Lock() + self._firstTaskStartTime = None + self._accRunTime = 0.0 # accumulated run time + + def incExecCount(self, klassName, isSuccess): # TODO: add a lock here + if klassName not in self._execTimes: + self._execTimes[klassName] = [0, 0] + t = self._execTimes[klassName] # tuple for the data + t[0] += 1 # index 0 has the "total" execution times + if isSuccess: + t[1] += 1 # index 1 has the "success" execution times + + def beginTaskType(self, klassName): + with self._lock: + if self._tasksInProgress == 0 : # starting a new round + self._firstTaskStartTime = time.time() # I am now the first task + self._tasksInProgress += 1 + + def endTaskType(self, klassName, isSuccess): + with self._lock: + self._tasksInProgress -= 1 + if self._tasksInProgress == 0 : # all tasks have stopped + self._accRunTime += (time.time() - self._firstTaskStartTime) + self._firstTaskStartTime = None + + def logStats(self): + logger.info("Logging task execution stats (success/total times)...") + execTimesAny = 0 + for k, n in self._execTimes.items(): + execTimesAny += n[1] + logger.info(" {0:<24}: {1}/{2}".format(k,n[1],n[0])) + + logger.info("Total Tasks Executed (success or not): {} ".format(execTimesAny)) + logger.info("Total Tasks In Progress at End: {}".format(self._tasksInProgress)) + logger.info("Total Task Busy Time (elapsed time when any task is in progress): {:.2f} seconds".format(self._accRunTime)) + + +class StateTransitionTask(Task): + # @classmethod + # def getAllTaskClasses(cls): # static + # return cls.__subclasses__() + @classmethod + def getInfo(cls): # each sub class should supply their own information + raise RuntimeError("Overriding method expected") + + @classmethod + def getBeginStates(cls): + return cls.getInfo()[0] + + @classmethod + def getEndState(cls): + return cls.getInfo()[1] + + @classmethod + def canBeginFrom(cls, state): + return state in cls.getBeginStates() + + def execute(self, wt: WorkerThread): + super().execute(wt) + + + +class CreateDbTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + [DbState.STATE_EMPTY], # can begin from + DbState.STATE_DB_ONLY # end state + ] + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("create database db") -class DropDbTask(Task): +class DropDbTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + [DbState.STATE_DB_ONLY, DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA], + DbState.STATE_EMPTY + ] + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("drop database db") -class CreateTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tIndex = self._dbState.addTable() - self.logDebug("Creating a table {} ...".format(tIndex)) - wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - self.logDebug("Table {} created.".format(tIndex)) - self._dbState.releaseTable(tIndex) +class CreateFixedTableTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + [DbState.STATE_DB_ONLY], + DbState.STATE_TABLE_ONLY + ] -class CreateFixedTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tblName = self._dbState.getFixedTableName() wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) -class ReadFixedDataTask(Task): +class ReadFixedDataTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA], + None # meaning doesn't affect state + ] + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tblName = self._dbState.getFixedTableName() self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later # tdSql.query(" cars where tbname in ('carzero', 'carone')") +class DropFixedTableTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA], + DbState.STATE_DB_ONLY # meaning doesn't affect state + ] + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedTableName() + wt.execSql("drop table db.{}".format(tblName)) + +class AddFixedDataTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA], + DbState.STATE_HAS_DATA + ] + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + ds = self._dbState + sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt()) + wt.execSql(sql) + + +#---------- Non State-Transition Related Tasks ----------# + +class CreateTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tIndex = self._dbState.addTable() + self.logDebug("Creating a table {} ...".format(tIndex)) + wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) + self.logDebug("Table {} created.".format(tIndex)) + self._dbState.releaseTable(tIndex) + class DropTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tableName = self._dbState.getTableNameToDelete() @@ -731,10 +902,7 @@ class DropTableTask(Task): self.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) -class DropFixedTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedTableName() - wt.execSql("drop table db.{}".format(tblName)) + class AddDataTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): @@ -750,11 +918,6 @@ class AddDataTask(Task): ds.releaseTable(tIndex) self.logDebug("Finished adding data") -class AddFixedDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState - sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt()) - wt.execSql(sql) # Deterministic random number generator class Dice(): @@ -789,28 +952,28 @@ class Dice(): # Anyone needing to carry out work should simply come here -class WorkDispatcher(): - def __init__(self, dbState): - # self.totalNumMethods = 2 - self.tasks = [ - CreateTableTask(dbState), - DropTableTask(dbState), - AddDataTask(dbState), - ] - - def throwDice(self): - max = len(self.tasks) - 1 - dRes = random.randint(0, max) - # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) - return dRes - - def pickTask(self): - dice = self.throwDice() - return self.tasks[dice] - - def doWork(self, workerThread): - task = self.pickTask() - task.execute(workerThread) +# class WorkDispatcher(): +# def __init__(self, dbState): +# # self.totalNumMethods = 2 +# self.tasks = [ +# # CreateTableTask(dbState), # Obsolete +# # DropTableTask(dbState), +# # AddDataTask(dbState), +# ] + +# def throwDice(self): +# max = len(self.tasks) - 1 +# dRes = random.randint(0, max) +# # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) +# return dRes + +# def pickTask(self): +# dice = self.throwDice() +# return self.tasks[dice] + +# def doWork(self, workerThread): +# task = self.pickTask() +# task.execute(workerThread) def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -839,7 +1002,7 @@ def main(): sys.exit() global logger - logger = logging.getLogger('myApp') + logger = logging.getLogger('CrashGen') if ( gConfig.debug ): logger.setLevel(logging.DEBUG) # default seems to be INFO ch = logging.StreamHandler() @@ -849,7 +1012,7 @@ def main(): Dice.seed(0) # initial seeding of dice tc = ThreadCoordinator( ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), - WorkDispatcher(dbState), + # WorkDispatcher(dbState), # Obsolete? dbState ) tc.run() -- GitLab