From 5f979568e113fd83d4e3c28176d2985d7ae28e56 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 24 Jun 2020 21:59:13 -0700 Subject: [PATCH] Refactored and cleaned up crash_gen tool, ready to add sub-state transitions --- tests/pytest/crash_gen.py | 143 ++++++++++++++------------------------ 1 file changed, 52 insertions(+), 91 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index e933a865d0..bff2403bb8 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -69,7 +69,7 @@ class WorkerThread: # self._curStep = -1 self._pool = pool self._tid = tid - self._tc = tc + self._tc = tc # type: ThreadCoordinator # self.threadIdent = threading.get_ident() self._thread = threading.Thread(target=runThread, args=(self,)) self._stepGate = threading.Event() @@ -161,13 +161,13 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): return self._dbConn.execute(sql) else: - return self._tc.getDbState().getDbConn().execute(sql) + return self._tc.getDbManager().getDbConn().execute(sql) def getDbConn(self): if ( gConfig.per_thread_db_connection ): return self._dbConn else: - return self._tc.getDbState().getDbConn() + return self._tc.getDbManager().getDbConn() # def querySql(self, sql): # not "execute", since we are out side the DB context # if ( gConfig.per_thread_db_connection ): @@ -176,12 +176,12 @@ class WorkerThread: # return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: - def __init__(self, pool, dbState): + def __init__(self, pool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd self._te = None # prepare for every new step - self._dbState = dbState + self._dbManager = dbManager self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things @@ -191,8 +191,8 @@ class ThreadCoordinator: def getTaskExecutor(self): return self._te - def getDbState(self) -> DbState : - return self._dbState + def getDbManager(self) -> DbManager : + return self._dbManager def crossStepBarrier(self): self._stepBarrier.wait() @@ -216,7 +216,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + self._dbManager.transition(self._executedTasks) # at end of step, transiton the DB state except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -289,8 +289,8 @@ class ThreadCoordinator: # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # return tasks[i].clone() # TODO: still necessary? - taskType = self.getDbState().pickTaskType() # pick a task type for current state - return taskType(self.getDbState(), self._execStats) # create a task from it + taskType = self.getDbManager().pickTaskType() # pick a task type for current state + return taskType(self.getDbManager(), self._execStats) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -301,16 +301,12 @@ class ThreadCoordinator: # We define a class to run a number of threads in locking steps. class ThreadPool: - def __init__(self, dbState, numThreads, maxSteps, funcSequencer): + def __init__(self, numThreads, maxSteps): self.numThreads = numThreads self.maxSteps = maxSteps - self.funcSequencer = funcSequencer # Internal class variables - # self.dispatcher = WorkDispatcher(dbState) # Obsolete? self.curStep = 0 self.threadList = [] - # self.stepGate = threading.Condition() # Gate to hold/sync all threads - # self.numWaitingThreads = 0 # starting to run all the threads, in locking steps def createAndStartThreads(self, tc: ThreadCoordinator): @@ -324,7 +320,8 @@ class ThreadPool: logger.debug("Joining thread...") workerThread._thread.join() -# A queue of continguous POSITIVE integers +# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers +# for new table names class LinearQueue(): def __init__(self): self.firstIndex = 1 # 1st ever element @@ -600,9 +597,9 @@ class StateEmpty(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB - if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks - self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers + if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB + if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks + self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers class StateDbOnly(AnyState): def getInfo(self): @@ -614,19 +611,19 @@ class StateDbOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( not self.hasTask(tasks, CreateDbTask) ): - self.assertAtMostOneSuccess(tasks, DropDbTask) # only if we don't create any more - self.assertIfExistThenSuccess(tasks, DropDbTask) + if ( not self.hasTask(tasks, TaskCreateDb) ): + self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more + self.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # Nothing to be said about adding data task # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - if ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - if ( not self.hasTask(tasks, DropFixedSuperTableTask) ): - self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything + if ( not self.hasTask(tasks, TaskDropSuperTable) ): + self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail @@ -650,8 +647,8 @@ class StateSuperTableOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) + if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table + self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) # self._state = self.STATE_DB_ONLY # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases @@ -675,28 +672,28 @@ class StateHasData(AnyState): def verifyTasksToState(self, tasks, newState): if ( newState.equals(AnyState.STATE_EMPTY) ): - self.hasSuccess(tasks, DropDbTask) - if ( not self.hasTask(tasks, CreateDbTask) ) : - self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + self.hasSuccess(tasks, TaskDropDb) + if ( not self.hasTask(tasks, TaskCreateDb) ) : + self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only - if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task - self.assertNoTask(tasks, DropDbTask) # we must have drop_db task - self.hasSuccess(tasks, DropFixedSuperTableTask) + if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task + self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task + self.hasSuccess(tasks, TaskDropSuperTable) # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted - self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) - self.assertNoTask(tasks, AddFixedDataTask) + self.assertNoTask(tasks, TaskDropDb) + self.assertNoTask(tasks, TaskDropSuperTable) + self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - self.assertNoTask(tasks, DropDbTask) - if (not self.hasTask(tasks, CreateFixedSuperTableTask)) : # if we didn't create the table - self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it + self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table + self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) -# State of the database as we believe it to be -class DbState(): +# Manager of the Database Data/Connection +class DbManager(): def __init__(self, resetDb = True): self.tableNumQueue = LinearQueue() @@ -879,11 +876,11 @@ class DbState(): # Generic Checks, first based on the start state if self._state.canCreateDb(): - self._state.assertIfExistThenSuccess(tasks, CreateDbTask) + self._state.assertIfExistThenSuccess(tasks, TaskCreateDb) # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops if self._state.canDropDb(): - self._state.assertIfExistThenSuccess(tasks, DropDbTask) + self._state.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop # if self._state.canCreateFixedTable(): @@ -930,8 +927,8 @@ class Task(): # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) return Task.taskSn - def __init__(self, dbState: DbState, execStats: ExecutionStats): - self._dbState = dbState + def __init__(self, dbManager: DbManager, execStats: ExecutionStats): + self._dbState = dbManager self._workerThread = None self._err = None self._curStep = None @@ -1075,7 +1072,7 @@ class StateTransitionTask(Task): -class CreateDbTask(StateTransitionTask): +class TaskCreateDb(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1090,7 +1087,7 @@ class CreateDbTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("create database db") -class DropDbTask(StateTransitionTask): +class TaskDropDb(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1106,7 +1103,7 @@ class DropDbTask(StateTransitionTask): wt.execSql("drop database db") logger.debug("[OPS] database dropped at {}".format(time.time())) -class CreateFixedSuperTableTask(StateTransitionTask): +class TaskCreateSuperTable(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1124,7 +1121,7 @@ class CreateFixedSuperTableTask(StateTransitionTask): # No need to create the regular tables, INSERT will do that automatically -class ReadFixedDataTask(StateTransitionTask): +class TaskReadData(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1151,7 +1148,7 @@ class ReadFixedDataTask(StateTransitionTask): # tdSql.query(" cars where tbname in ('carzero', 'carone')") -class DropFixedSuperTableTask(StateTransitionTask): +class TaskDropSuperTable(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1167,7 +1164,7 @@ class DropFixedSuperTableTask(StateTransitionTask): tblName = self._dbState.getFixedSuperTableName() wt.execSql("drop table db.{}".format(tblName)) -class AddFixedDataTask(StateTransitionTask): +class TaskAddData(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on LARGE_NUMBER_OF_TABLES = 35 SMALL_NUMBER_OF_TABLES = 3 @@ -1233,42 +1230,6 @@ class AddFixedDataTask(StateTransitionTask): self.activeTable.discard(i) # not raising an error, unlike remove -#---------- Non State-Transition Related Tasks ----------# - -class CreateTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tIndex = self._dbState.addTable() - self.logDebug("Creating a table {} ...".format(tIndex)) - wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - self.logDebug("Table {} created.".format(tIndex)) - self._dbState.releaseTable(tIndex) - -class DropTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tableName = self._dbState.getTableNameToDelete() - if ( not tableName ): # May be "False" - self.logInfo("Cannot generate a table to delete, skipping...") - return - self.logInfo("Dropping a table db.{} ...".format(tableName)) - wt.execSql("drop table db.{}".format(tableName)) - - - -class AddDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState - self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) - tIndex = ds.pickAndAllocateTable() - if ( tIndex == None ): - self.logInfo("No table found to add data, skipping...") - return - sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - self.logDebug("[SQL] Executing SQL: {}".format(sql)) - wt.execSql(sql) - ds.releaseTable(tIndex) - self.logDebug("[OPS] Finished adding data") - - # Deterministic random number generator class Dice(): seeded = False # static, uninitialized @@ -1384,12 +1345,12 @@ def main(): # resetDb = False # DEBUG only # dbState = DbState(resetDb) # DBEUG only! - dbState = DbState() # Regular function + dbManager = DbManager() # Regular function Dice.seed(0) # initial seeding of dice tc = ThreadCoordinator( - ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), + ThreadPool(gConfig.num_threads, gConfig.max_steps), # WorkDispatcher(dbState), # Obsolete? - dbState + dbManager ) # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix @@ -1437,7 +1398,7 @@ def main(): tc.run() tc.logStats() - dbState.cleanUp() + dbManager.cleanUp() # logger.info("Crash_Gen execution finished") -- GitLab