diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index c060a5dae70fed118b0340b4d040309cd3c99bb2..49c3c15a42a768216946ff4662022b16bde0b48b 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -14,6 +14,7 @@ from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel import sys +import traceback # Require Python 3 if sys.version_info[0] < 3: raise Exception("Must be using Python 3") @@ -105,10 +106,11 @@ class WorkerThread: while True: tc = self._tc # Thread Coordinator, the overall master tc.crossStepBarrier() # shared barrier first, INCLUDING the last one - logger.debug("Thread task loop exited barrier...") + # logger.debug("Thread task loop exited barrier...") self.crossStepGate() # then per-thread gate, after being tapped - logger.debug("Thread task loop exited step gate...") + # logger.debug("Thread task loop exited step gate...") if not self._tc.isRunning(): + logger.debug("Thread Coordinator not running any more, worker thread now stopping...") break task = tc.fetchTask() @@ -143,7 +145,7 @@ class WorkerThread: self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - logger.debug("Tapping worker thread {}".format(self._tid)) + # logger.debug("Tapping worker thread {}".format(self._tid)) self._stepGate.set() # wake up! time.sleep(0) # let the released thread run a bit @@ -153,11 +155,11 @@ class WorkerThread: else: return self._tc.getDbState().getDbConn().execute(sql) - def querySql(self, sql): # not "execute", since we are out side the DB context - if ( gConfig.per_thread_db_connection ): - return self._dbConn.query(sql) - else: - return self._tc.getDbState().getDbConn().query(sql) + # def querySql(self, sql): # not "execute", since we are out side the DB context + # if ( gConfig.per_thread_db_connection ): + # return self._dbConn.query(sql) + # else: + # return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: def __init__(self, pool, dbState): @@ -187,8 +189,9 @@ class ThreadCoordinator: # Coordinate all threads step by step self._curStep = -1 # not started yet maxSteps = gConfig.max_steps # type: ignore - startTime = time.time() - while(self._curStep < maxSteps-1): # maxStep==10, last curStep should be 9 + self._execStats.startExec() # start the stop watch + failed = False + while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 print(".", end="", flush=True) logger.debug("Main thread going to sleep") @@ -197,7 +200,21 @@ class ThreadCoordinator: self._stepBarrier.reset() # Other worker threads should now be at the "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" - self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + try: + self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + except taos.error.ProgrammingError as err: + if ( err.msg == 'network unavailable' ): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + failed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at end, and maybe signal them to stop + else: + raise + finally: + pass + self.resetExecutedTasks() # clear the tasks after we are done # Get ready for next step @@ -206,25 +223,28 @@ class ThreadCoordinator: logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep # A new TE for the new step - self._te = TaskExecutor(self._curStep) + if not failed: # only if not failed + self._te = TaskExecutor(self._curStep) logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep self.tapAllThreads() logger.debug("Main thread ready to finish up...") - self.crossStepBarrier() # Cross it one last time, after all threads finish - self._stepBarrier.reset() - logger.debug("Main thread in exclusive zone...") - self._te = None # No more executor, time to end - logger.debug("Main thread tapping all threads one last time...") - self.tapAllThreads() # Let the threads run one last time + if not failed: # only in regular situations + self.crossStepBarrier() # Cross it one last time, after all threads finish + self._stepBarrier.reset() + logger.debug("Main thread in exclusive zone...") + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish - logger.info("All threads finished") + self._execStats.endExec() + + def logStats(self): self._execStats.logStats() - logger.info("Total Execution Time (task busy time, plus Python overhead): {:.2f} seconds".format(time.time() - startTime)) - print("\r\nFinished") def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -380,7 +400,7 @@ class DbConn: # Get the connection/cursor ready self._cursor.execute('reset query cache') - # self._cursor.execute('use db') + # self._cursor.execute('use db') # note we do this in _findCurrenState # Open connection self._tdSql = TDSql() @@ -410,27 +430,231 @@ class DbConn: raise RuntimeError("Cannot execute database commands until connection is open") return self._tdSql.execute(sql) - def query(self, sql) -> int : # return number of rows retrieved + def query(self, sql) : # return rows affected if ( not self.isOpen ): raise RuntimeError("Cannot query database until connection is open") return self._tdSql.query(sql) + # results are in: return self._tdSql.queryResult - -# State of the database as we believe it to be -class DbState(): + def _queryAny(self, sql) : # actual query result as an int + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") + tSql = self._tdSql + nRows = tSql.query(sql) + if nRows != 1 : + raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) + if tSql.queryRows != 1 or tSql.queryCols != 1: + raise RuntimeError("Unexpected result set for query: {}".format(sql)) + return tSql.queryResult[0][0] + + def queryScalar(self, sql) -> int : + return self._queryAny(sql) + + def queryString(self, sql) -> str : + return self._queryAny(sql) + +class AnyState: STATE_INVALID = -1 STATE_EMPTY = 0 # nothing there, no even a DB STATE_DB_ONLY = 1 # we have a DB, but nothing else STATE_TABLE_ONLY = 2 # we have a table, but totally empty STATE_HAS_DATA = 3 # we have some data in the table + _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"] + + STATE_VAL_IDX = 0 + CAN_CREATE_DB = 1 + CAN_DROP_DB = 2 + CAN_CREATE_FIXED_TABLE = 3 + CAN_DROP_FIXED_TABLE = 4 + CAN_ADD_DATA = 5 + CAN_READ_DATA = 6 + + def __init__(self): + self._info = self.getInfo() + + def __str__(self): + return self._stateNames[self._info[self.STATE_VAL_IDX] - 1] # -1 hack to accomodate the STATE_INVALID case + + def getInfo(self): + raise RuntimeError("Must be overriden by child classes") + + def verifyTasksToState(self, tasks, newState): + raise RuntimeError("Must be overriden by child classes") + + def getValue(self): + return self._info[self.STATE_VAL_IDX] + def canCreateDb(self): + return self._info[self.CAN_CREATE_DB] + def canDropDb(self): + return self._info[self.CAN_DROP_DB] + def canCreateFixedTable(self): + return self._info[self.CAN_CREATE_FIXED_TABLE] + def canDropFixedTable(self): + return self._info[self.CAN_DROP_FIXED_TABLE] + def canAddData(self): + return self._info[self.CAN_ADD_DATA] + def canReadData(self): + return self._info[self.CAN_READ_DATA] + + def assertAtMostOneSuccess(self, tasks, cls): + sCnt = 0 + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + task.logDebug("Task success found") + sCnt += 1 + if ( sCnt >= 2 ): + raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) + + def assertIfExistThenSuccess(self, tasks, cls): + sCnt = 0 + exists = False + for task in tasks : + if not isinstance(task, cls): + continue + exists = True # we have a valid instance + if task.isSuccess(): + sCnt += 1 + if ( exists and sCnt <= 0 ): + raise RuntimeError("Unexpected zero success for task: {}".format(cls)) + + def assertNoTask(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__)) + + def assertNoSuccess(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + if task.isSuccess(): + raise RuntimeError("Unexpected successful task: {}".format(cls)) + + def hasSuccess(self, tasks, cls): + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + return True + return False + +class StateInvalid(AnyState): + def getInfo(self): + return [ + self.STATE_INVALID, + False, False, # can create/drop Db + False, False, # can create/drop fixed table + False, False, # can insert/read data with fixed table + ] + + # def verifyTasksToState(self, tasks, newState): + +class StateEmpty(AnyState): + def getInfo(self): + return [ + self.STATE_EMPTY, + True, False, # can create/drop Db + False, False, # can create/drop fixed table + False, False, # can insert/read data with fixed table + ] + + def verifyTasksToState(self, tasks, newState): + if ( self.hasSuccess(tasks, CreateDbTask) ): + self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really valid for massively parrallel tasks + +class StateDbOnly(AnyState): + def getInfo(self): + return [ + self.STATE_DB_ONLY, + False, True, + True, False, + False, False, + ] + + def verifyTasksToState(self, tasks, newState): + self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases + self.assertIfExistThenSuccess(tasks, DropDbTask) + self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) + # Nothing to be said about adding data task + if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess + self.assertAtMostOneSuccess(tasks, DropDbTask) + # self._state = self.STATE_EMPTY + elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success + # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table + self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful + self.assertNoTask(tasks, DropDbTask) # should have have tried + # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet + # # can't say there's add-data attempts, since they may all fail + # self._state = self.STATE_TABLE_ONLY + # else: + # self._state = self.STATE_HAS_DATA + # What about AddFixedData? + # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): + # self._state = self.STATE_HAS_DATA + # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail + # # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks, + # self._state = self.STATE_DB_ONLY # no change + +class StateTableOnly(AnyState): + def getInfo(self): + return [ + self.STATE_TABLE_ONLY, + False, True, + False, True, + True, True, + ] + + def verifyTasksToState(self, tasks, newState): + if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table + self.assertAtMostOneSuccess(tasks, DropFixedTableTask) + # self._state = self.STATE_DB_ONLY + elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data + self.assertNoTask(tasks, DropFixedTableTask) + # self._state = self.STATE_HAS_DATA + elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data + self.assertNoTask(tasks, DropFixedTableTask) + self.assertNoTask(tasks, AddFixedDataTask) + # self._state = self.STATE_TABLE_ONLY # no change + else: # did not drop table, did not insert data, did not read successfully, that is impossible + raise RuntimeError("Unexpected no-success scenarios") + +class StateHasData(AnyState): + def getInfo(self): + return [ + self.STATE_HAS_DATA, + False, True, + False, True, + True, True, + ] + + def verifyTasksToState(self, tasks, newState): + if ( self.hasSuccess(tasks, DropFixedTableTask) ): + self.assertAtMostOneSuccess(tasks, DropFixedTableTask) + # self._state = self.STATE_DB_ONLY + else: # no success dropping the table, table remains intact in this step + self.assertNoTask(tasks, DropFixedTableTask) # we should not have had such a task + + if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # added data + # self._state = self.STATE_HAS_DATA + # else: + self.assertNoTask(tasks, AddFixedDataTask) + if ( not self.hasSuccess(tasks, ReadFixedDataTask) ): # simple able to read some data + # which is ok, then no state change + # self._state = self.STATE_HAS_DATA # no change + # else: # did not drop table, did not insert data, that is impossible? yeah, we might only had ReadData task + raise RuntimeError("Unexpected no-success scenarios") +# State of the database as we believe it to be +class DbState(): + def __init__(self): self.tableNumQueue = LinearQueue() self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() - self._state = self.STATE_INVALID + self._state = StateInvalid() # starting state self._stateWeights = [1,3,5,10] # self.openDbServerConnection() @@ -448,7 +672,7 @@ class DbState(): print("[=]Unexpected exception") raise self._dbConn.resetDb() # drop and recreate DB - self._state = self.STATE_EMPTY # initial state, the result of above + self._state = StateEmpty() # initial state, the result of above def getDbConn(self): return self._dbConn @@ -484,9 +708,6 @@ class DbState(): return "table_{}".format(tblNum) - def execSql(self, sql): # using the main DB connection - return self._dbConn.execute(sql) - def cleanUp(self): self._dbConn.close() @@ -529,7 +750,7 @@ class DbState(): else: weights.append(10) # read data task, default to 10: TODO: change to a constant i = self._weighted_choice_sub(weights) - logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) + # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) return taskTypes[i] def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ @@ -539,131 +760,52 @@ class DbState(): if rnd < 0: return i + def _findCurrentState(self): + dbc = self._dbConn + if dbc.query("show databases") == 0 : # no database?! + return StateEmpty() + dbc.execute("use db") # did not do this when openning connection + if dbc.query("show tables") == 0 : # no tables + return StateDbOnly() + if dbc.query("SELECT * FROM {}".format(self.getFixedTableName()) ) == 0 : # no data + return StateTableOnly() + else: + return StateHasData() - def transition(self, tasks): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty return # do nothing - self.execSql("show dnodes") # this should show up in the server log, separating steps - - if ( self._state == self.STATE_EMPTY ): - # self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table - if ( self.hasSuccess(tasks, CreateDbTask) ): - self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class - self._state = self.STATE_DB_ONLY - if ( self.hasSuccess(tasks, CreateFixedTableTask )): - self._state = self.STATE_TABLE_ONLY - # else: # no successful table creation, not much we can say, as it is step 2 - else: # did not create db - self.assertNoTask(tasks, CreateDbTask) # because we did not have such task - # self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task - self.assertNoSuccess(tasks, CreateFixedTableTask) - - elif ( self._state == self.STATE_DB_ONLY ): - self.assertAtMostOneSuccess(tasks, DropDbTask) - self.assertIfExistThenSuccess(tasks, DropDbTask) - self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) - # Nothing to be said about adding data task - if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB - # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess - self.assertAtMostOneSuccess(tasks, DropDbTask) - self._state = self.STATE_EMPTY - elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success - # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful - self.assertNoTask(tasks, DropDbTask) # should have have tried - if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet - # can't say there's add-data attempts, since they may all fail - self._state = self.STATE_TABLE_ONLY - else: - self._state = self.STATE_HAS_DATA - # What about AddFixedData? - elif ( self.hasSuccess(tasks, AddFixedDataTask) ): - self._state = self.STATE_HAS_DATA - else: # no success in dropping db tasks, no success in create fixed table? read data should also fail - # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks, - self._state = self.STATE_DB_ONLY # no change - - elif ( self._state == self.STATE_TABLE_ONLY ): - if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, DropFixedTableTask) - self._state = self.STATE_DB_ONLY - elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data - self.assertNoTask(tasks, DropFixedTableTask) - self._state = self.STATE_HAS_DATA - elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data - self.assertNoTask(tasks, DropFixedTableTask) - self.assertNoTask(tasks, AddFixedDataTask) - self._state = self.STATE_TABLE_ONLY # no change - else: # did not drop table, did not insert data, did not read successfully, that is impossible - raise RuntimeError("Unexpected no-success scenarios") + self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps - elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust - if ( self.hasSuccess(tasks, DropFixedTableTask) ): - self.assertAtMostOneSuccess(tasks, DropFixedTableTask) - self._state = self.STATE_DB_ONLY - else: # no success dropping the table, table remains intact in this step - self.assertNoTask(tasks, DropFixedTableTask) # we should not have had such a task + # Generic Checks, first based on the start state + if self._state.canCreateDb(): + self._state.assertIfExistThenSuccess(tasks, CreateDbTask) + # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops - if ( self.hasSuccess(tasks, AddFixedDataTask) ): # added data - self._state = self.STATE_HAS_DATA - else: - self.assertNoTask(tasks, AddFixedDataTask) + if self._state.canDropDb(): + self._state.assertIfExistThenSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop - if ( self.hasSuccess(tasks, ReadFixedDataTask) ): # simple able to read some data - # which is ok, then no state change - self._state = self.STATE_HAS_DATA # no change - else: # did not drop table, did not insert data, that is impossible? yeah, we might only had ReadData task - raise RuntimeError("Unexpected no-success scenarios") + # if self._state.canCreateFixedTable(): + # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped + # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create - else: - raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - logger.debug("New DB state is: {}".format(self._state)) + # if self._state.canDropFixedTable(): + # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped + # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop - def assertAtMostOneSuccess(self, tasks, cls): - sCnt = 0 - for task in tasks : - if not isinstance(task, cls): - continue - if task.isSuccess(): - task.logDebug("Task success found") - sCnt += 1 - if ( sCnt >= 2 ): - raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) + # if self._state.canAddData(): + # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually - def assertIfExistThenSuccess(self, tasks, cls): - sCnt = 0 - exists = False - for task in tasks : - if not isinstance(task, cls): - continue - exists = True # we have a valid instance - if task.isSuccess(): - sCnt += 1 - if ( exists and sCnt <= 0 ): - raise RuntimeError("Unexpected zero success for task: {}".format(cls)) - - def assertNoTask(self, tasks, cls): - for task in tasks : - if isinstance(task, cls): - raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__)) - - def assertNoSuccess(self, tasks, cls): - for task in tasks : - if isinstance(task, cls): - if task.isSuccess(): - raise RuntimeError("Unexpected successful task: {}".format(cls)) - - def hasSuccess(self, tasks, cls): - for task in tasks : - if not isinstance(task, cls): - continue - if task.isSuccess(): - return True - return False + # if self._state.canReadData(): + # Nothing for sure + newState = self._findCurrentState() + self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? + self._state = newState + logger.debug("New DB state is: {}".format(self._state)) class TaskExecutor(): def __init__(self, curStep): @@ -750,8 +892,19 @@ class ExecutionStats: self._tasksInProgress = 0 self._lock = threading.Lock() self._firstTaskStartTime = None + self._execStartTime = None + self._elapsedTime = 0.0 # total elapsed time self._accRunTime = 0.0 # accumulated run time + self._failed = False + self._failureReason = None + + def startExec(self): + self._execStartTime = time.time() + + def endExec(self): + self._elapsedTime = time.time() - self._execStartTime + def incExecCount(self, klassName, isSuccess): # TODO: add a lock here if klassName not in self._execTimes: self._execTimes[klassName] = [0, 0] @@ -773,16 +926,27 @@ class ExecutionStats: self._accRunTime += (time.time() - self._firstTaskStartTime) self._firstTaskStartTime = None + def registerFailure(self, reason): + self._failed = True + self._failureReason = reason + def logStats(self): - logger.info("Logging task execution stats (success/total times)...") + logger.info("----------------------------------------------------------------------") + logger.info("| Crash_Gen test {}, with the following stats:". + format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED")) + logger.info("| Task Execution Times (success/total):") execTimesAny = 0 for k, n in self._execTimes.items(): execTimesAny += n[1] - logger.info(" {0:<24}: {1}/{2}".format(k,n[1],n[0])) + logger.info("| {0:<24}: {1}/{2}".format(k,n[1],n[0])) - logger.info("Total Tasks Executed (success or not): {} ".format(execTimesAny)) - logger.info("Total Tasks In Progress at End: {}".format(self._tasksInProgress)) - logger.info("Total Task Busy Time (elapsed time when any task is in progress): {:.2f} seconds".format(self._accRunTime)) + logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny)) + logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress)) + logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime)) + logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny)) + logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime)) + logger.info("----------------------------------------------------------------------") + class StateTransitionTask(Task): @@ -793,17 +957,18 @@ class StateTransitionTask(Task): def getInfo(cls): # each sub class should supply their own information raise RuntimeError("Overriding method expected") - @classmethod - def getBeginStates(cls): - return cls.getInfo()[0] + # @classmethod + # def getBeginStates(cls): + # return cls.getInfo()[0] @classmethod def getEndState(cls): - return cls.getInfo()[1] + return cls.getInfo()[0] @classmethod - def canBeginFrom(cls, state): - return state in cls.getBeginStates() + def canBeginFrom(cls, state: AnyState): + # return state.getValue() in cls.getBeginStates() + raise RuntimeError("must be overriden") def execute(self, wt: WorkerThread): super().execute(wt) @@ -814,10 +979,14 @@ class CreateDbTask(StateTransitionTask): @classmethod def getInfo(cls): return [ - [DbState.STATE_EMPTY], # can begin from - DbState.STATE_DB_ONLY # end state + # [AnyState.STATE_EMPTY], # can begin from + AnyState.STATE_DB_ONLY # end state ] + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canCreateDb() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("create database db") @@ -825,10 +994,14 @@ class DropDbTask(StateTransitionTask): @classmethod def getInfo(cls): return [ - [DbState.STATE_DB_ONLY, DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA], - DbState.STATE_EMPTY + # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + AnyState.STATE_EMPTY ] + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropDb() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("drop database db") @@ -836,10 +1009,14 @@ class CreateFixedTableTask(StateTransitionTask): @classmethod def getInfo(cls): return [ - [DbState.STATE_DB_ONLY], - DbState.STATE_TABLE_ONLY + # [AnyState.STATE_DB_ONLY], + AnyState.STATE_TABLE_ONLY ] + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canCreateFixedTable() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tblName = self._dbState.getFixedTableName() wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) @@ -848,23 +1025,31 @@ class ReadFixedDataTask(StateTransitionTask): @classmethod def getInfo(cls): return [ - [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA], + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], None # meaning doesn't affect state ] + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canReadData() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tblName = self._dbState.getFixedTableName() - self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later + wt.execSql("select * from db.{}".format(tblName)) # TODO: analyze result set later # tdSql.query(" cars where tbname in ('carzero', 'carone')") class DropFixedTableTask(StateTransitionTask): @classmethod def getInfo(cls): return [ - [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA], - DbState.STATE_DB_ONLY # meaning doesn't affect state + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + AnyState.STATE_DB_ONLY # meaning doesn't affect state ] + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropFixedTable() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tblName = self._dbState.getFixedTableName() wt.execSql("drop table db.{}".format(tblName)) @@ -873,9 +1058,13 @@ class AddFixedDataTask(StateTransitionTask): @classmethod def getInfo(cls): return [ - [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA], - DbState.STATE_HAS_DATA + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + AnyState.STATE_HAS_DATA ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canAddData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState @@ -1015,8 +1204,11 @@ def main(): # WorkDispatcher(dbState), # Obsolete? dbState ) + tc.run() - dbState.cleanUp() + tc.logStats() + dbState.cleanUp() + logger.info("Finished running thread pool") if __name__ == "__main__":