提交 5f979568 编写于 作者: S Steven Li

Refactored and cleaned up crash_gen tool, ready to add sub-state transitions

上级 c44bb0c6
...@@ -69,7 +69,7 @@ class WorkerThread: ...@@ -69,7 +69,7 @@ class WorkerThread:
# self._curStep = -1 # self._curStep = -1
self._pool = pool self._pool = pool
self._tid = tid self._tid = tid
self._tc = tc self._tc = tc # type: ThreadCoordinator
# self.threadIdent = threading.get_ident() # self.threadIdent = threading.get_ident()
self._thread = threading.Thread(target=runThread, args=(self,)) self._thread = threading.Thread(target=runThread, args=(self,))
self._stepGate = threading.Event() self._stepGate = threading.Event()
...@@ -161,13 +161,13 @@ class WorkerThread: ...@@ -161,13 +161,13 @@ class WorkerThread:
if ( gConfig.per_thread_db_connection ): if ( gConfig.per_thread_db_connection ):
return self._dbConn.execute(sql) return self._dbConn.execute(sql)
else: else:
return self._tc.getDbState().getDbConn().execute(sql) return self._tc.getDbManager().getDbConn().execute(sql)
def getDbConn(self): def getDbConn(self):
if ( gConfig.per_thread_db_connection ): if ( gConfig.per_thread_db_connection ):
return self._dbConn return self._dbConn
else: else:
return self._tc.getDbState().getDbConn() return self._tc.getDbManager().getDbConn()
# def querySql(self, sql): # not "execute", since we are out side the DB context # def querySql(self, sql): # not "execute", since we are out side the DB context
# if ( gConfig.per_thread_db_connection ): # if ( gConfig.per_thread_db_connection ):
...@@ -176,12 +176,12 @@ class WorkerThread: ...@@ -176,12 +176,12 @@ class WorkerThread:
# return self._tc.getDbState().getDbConn().query(sql) # return self._tc.getDbState().getDbConn().query(sql)
class ThreadCoordinator: class ThreadCoordinator:
def __init__(self, pool, dbState): def __init__(self, pool, dbManager):
self._curStep = -1 # first step is 0 self._curStep = -1 # first step is 0
self._pool = pool self._pool = pool
# self._wd = wd # self._wd = wd
self._te = None # prepare for every new step self._te = None # prepare for every new step
self._dbState = dbState self._dbManager = dbManager
self._executedTasks: List[Task] = [] # in a given step self._executedTasks: List[Task] = [] # in a given step
self._lock = threading.RLock() # sync access for a few things self._lock = threading.RLock() # sync access for a few things
...@@ -191,8 +191,8 @@ class ThreadCoordinator: ...@@ -191,8 +191,8 @@ class ThreadCoordinator:
def getTaskExecutor(self): def getTaskExecutor(self):
return self._te return self._te
def getDbState(self) -> DbState : def getDbManager(self) -> DbManager :
return self._dbState return self._dbManager
def crossStepBarrier(self): def crossStepBarrier(self):
self._stepBarrier.wait() self._stepBarrier.wait()
...@@ -216,7 +216,7 @@ class ThreadCoordinator: ...@@ -216,7 +216,7 @@ class ThreadCoordinator:
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
try: try:
self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state self._dbManager.transition(self._executedTasks) # at end of step, transiton the DB state
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
if ( err.msg == 'network unavailable' ): # broken DB connection if ( err.msg == 'network unavailable' ): # broken DB connection
logger.info("DB connection broken, execution failed") logger.info("DB connection broken, execution failed")
...@@ -289,8 +289,8 @@ class ThreadCoordinator: ...@@ -289,8 +289,8 @@ class ThreadCoordinator:
# logger.debug(" (dice:{}/{}) ".format(i, nTasks)) # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
# # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
# return tasks[i].clone() # TODO: still necessary? # return tasks[i].clone() # TODO: still necessary?
taskType = self.getDbState().pickTaskType() # pick a task type for current state taskType = self.getDbManager().pickTaskType() # pick a task type for current state
return taskType(self.getDbState(), self._execStats) # create a task from it return taskType(self.getDbManager(), self._execStats) # create a task from it
def resetExecutedTasks(self): def resetExecutedTasks(self):
self._executedTasks = [] # should be under single thread self._executedTasks = [] # should be under single thread
...@@ -301,16 +301,12 @@ class ThreadCoordinator: ...@@ -301,16 +301,12 @@ class ThreadCoordinator:
# We define a class to run a number of threads in locking steps. # We define a class to run a number of threads in locking steps.
class ThreadPool: class ThreadPool:
def __init__(self, dbState, numThreads, maxSteps, funcSequencer): def __init__(self, numThreads, maxSteps):
self.numThreads = numThreads self.numThreads = numThreads
self.maxSteps = maxSteps self.maxSteps = maxSteps
self.funcSequencer = funcSequencer
# Internal class variables # Internal class variables
# self.dispatcher = WorkDispatcher(dbState) # Obsolete?
self.curStep = 0 self.curStep = 0
self.threadList = [] self.threadList = []
# self.stepGate = threading.Condition() # Gate to hold/sync all threads
# self.numWaitingThreads = 0
# starting to run all the threads, in locking steps # starting to run all the threads, in locking steps
def createAndStartThreads(self, tc: ThreadCoordinator): def createAndStartThreads(self, tc: ThreadCoordinator):
...@@ -324,7 +320,8 @@ class ThreadPool: ...@@ -324,7 +320,8 @@ class ThreadPool:
logger.debug("Joining thread...") logger.debug("Joining thread...")
workerThread._thread.join() workerThread._thread.join()
# A queue of continguous POSITIVE integers # A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
class LinearQueue(): class LinearQueue():
def __init__(self): def __init__(self):
self.firstIndex = 1 # 1st ever element self.firstIndex = 1 # 1st ever element
...@@ -600,9 +597,9 @@ class StateEmpty(AnyState): ...@@ -600,9 +597,9 @@ class StateEmpty(AnyState):
] ]
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB
if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks
self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers
class StateDbOnly(AnyState): class StateDbOnly(AnyState):
def getInfo(self): def getInfo(self):
...@@ -614,19 +611,19 @@ class StateDbOnly(AnyState): ...@@ -614,19 +611,19 @@ class StateDbOnly(AnyState):
] ]
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if ( not self.hasTask(tasks, CreateDbTask) ): if ( not self.hasTask(tasks, TaskCreateDb) ):
self.assertAtMostOneSuccess(tasks, DropDbTask) # only if we don't create any more self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more
self.assertIfExistThenSuccess(tasks, DropDbTask) self.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
# Nothing to be said about adding data task # Nothing to be said about adding data task
# if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
# self.assertAtMostOneSuccess(tasks, DropDbTask) # self.assertAtMostOneSuccess(tasks, DropDbTask)
# self._state = self.STATE_EMPTY # self._state = self.STATE_EMPTY
if ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
if ( not self.hasTask(tasks, DropFixedSuperTableTask) ): if ( not self.hasTask(tasks, TaskDropSuperTable) ):
self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
# self.assertNoTask(tasks, DropDbTask) # should have have tried # self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail # # can't say there's add-data attempts, since they may all fail
...@@ -650,8 +647,8 @@ class StateSuperTableOnly(AnyState): ...@@ -650,8 +647,8 @@ class StateSuperTableOnly(AnyState):
] ]
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table
self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
# self._state = self.STATE_DB_ONLY # self._state = self.STATE_DB_ONLY
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
# self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
...@@ -675,28 +672,28 @@ class StateHasData(AnyState): ...@@ -675,28 +672,28 @@ class StateHasData(AnyState):
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if ( newState.equals(AnyState.STATE_EMPTY) ): if ( newState.equals(AnyState.STATE_EMPTY) ):
self.hasSuccess(tasks, DropDbTask) self.hasSuccess(tasks, TaskDropDb)
if ( not self.hasTask(tasks, CreateDbTask) ) : if ( not self.hasTask(tasks, TaskCreateDb) ) :
self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy
elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only
if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task
self.assertNoTask(tasks, DropDbTask) # we must have drop_db task self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task
self.hasSuccess(tasks, DropFixedSuperTableTask) self.hasSuccess(tasks, TaskDropSuperTable)
# self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
self.assertNoTask(tasks, DropDbTask) self.assertNoTask(tasks, TaskDropDb)
self.assertNoTask(tasks, DropFixedSuperTableTask) self.assertNoTask(tasks, TaskDropSuperTable)
self.assertNoTask(tasks, AddFixedDataTask) self.assertNoTask(tasks, TaskAddData)
# self.hasSuccess(tasks, DeleteDataTasks) # self.hasSuccess(tasks, DeleteDataTasks)
else: # should be STATE_HAS_DATA else: # should be STATE_HAS_DATA
self.assertNoTask(tasks, DropDbTask) self.assertNoTask(tasks, TaskDropDb)
if (not self.hasTask(tasks, CreateFixedSuperTableTask)) : # if we didn't create the table if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table
self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it
# self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
# State of the database as we believe it to be # Manager of the Database Data/Connection
class DbState(): class DbManager():
def __init__(self, resetDb = True): def __init__(self, resetDb = True):
self.tableNumQueue = LinearQueue() self.tableNumQueue = LinearQueue()
...@@ -879,11 +876,11 @@ class DbState(): ...@@ -879,11 +876,11 @@ class DbState():
# Generic Checks, first based on the start state # Generic Checks, first based on the start state
if self._state.canCreateDb(): if self._state.canCreateDb():
self._state.assertIfExistThenSuccess(tasks, CreateDbTask) self._state.assertIfExistThenSuccess(tasks, TaskCreateDb)
# self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
if self._state.canDropDb(): if self._state.canDropDb():
self._state.assertIfExistThenSuccess(tasks, DropDbTask) self._state.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
# if self._state.canCreateFixedTable(): # if self._state.canCreateFixedTable():
...@@ -930,8 +927,8 @@ class Task(): ...@@ -930,8 +927,8 @@ class Task():
# logger.debug("Allocating taskSN: {}".format(Task.taskSn)) # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
return Task.taskSn return Task.taskSn
def __init__(self, dbState: DbState, execStats: ExecutionStats): def __init__(self, dbManager: DbManager, execStats: ExecutionStats):
self._dbState = dbState self._dbState = dbManager
self._workerThread = None self._workerThread = None
self._err = None self._err = None
self._curStep = None self._curStep = None
...@@ -1075,7 +1072,7 @@ class StateTransitionTask(Task): ...@@ -1075,7 +1072,7 @@ class StateTransitionTask(Task):
class CreateDbTask(StateTransitionTask): class TaskCreateDb(StateTransitionTask):
@classmethod @classmethod
def getInfo(cls): def getInfo(cls):
return [ return [
...@@ -1090,7 +1087,7 @@ class CreateDbTask(StateTransitionTask): ...@@ -1090,7 +1087,7 @@ class CreateDbTask(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("create database db") wt.execSql("create database db")
class DropDbTask(StateTransitionTask): class TaskDropDb(StateTransitionTask):
@classmethod @classmethod
def getInfo(cls): def getInfo(cls):
return [ return [
...@@ -1106,7 +1103,7 @@ class DropDbTask(StateTransitionTask): ...@@ -1106,7 +1103,7 @@ class DropDbTask(StateTransitionTask):
wt.execSql("drop database db") wt.execSql("drop database db")
logger.debug("[OPS] database dropped at {}".format(time.time())) logger.debug("[OPS] database dropped at {}".format(time.time()))
class CreateFixedSuperTableTask(StateTransitionTask): class TaskCreateSuperTable(StateTransitionTask):
@classmethod @classmethod
def getInfo(cls): def getInfo(cls):
return [ return [
...@@ -1124,7 +1121,7 @@ class CreateFixedSuperTableTask(StateTransitionTask): ...@@ -1124,7 +1121,7 @@ class CreateFixedSuperTableTask(StateTransitionTask):
# No need to create the regular tables, INSERT will do that automatically # No need to create the regular tables, INSERT will do that automatically
class ReadFixedDataTask(StateTransitionTask): class TaskReadData(StateTransitionTask):
@classmethod @classmethod
def getInfo(cls): def getInfo(cls):
return [ return [
...@@ -1151,7 +1148,7 @@ class ReadFixedDataTask(StateTransitionTask): ...@@ -1151,7 +1148,7 @@ class ReadFixedDataTask(StateTransitionTask):
# tdSql.query(" cars where tbname in ('carzero', 'carone')") # tdSql.query(" cars where tbname in ('carzero', 'carone')")
class DropFixedSuperTableTask(StateTransitionTask): class TaskDropSuperTable(StateTransitionTask):
@classmethod @classmethod
def getInfo(cls): def getInfo(cls):
return [ return [
...@@ -1167,7 +1164,7 @@ class DropFixedSuperTableTask(StateTransitionTask): ...@@ -1167,7 +1164,7 @@ class DropFixedSuperTableTask(StateTransitionTask):
tblName = self._dbState.getFixedSuperTableName() tblName = self._dbState.getFixedSuperTableName()
wt.execSql("drop table db.{}".format(tblName)) wt.execSql("drop table db.{}".format(tblName))
class AddFixedDataTask(StateTransitionTask): class TaskAddData(StateTransitionTask):
activeTable : Set[int] = set() # Track which table is being actively worked on activeTable : Set[int] = set() # Track which table is being actively worked on
LARGE_NUMBER_OF_TABLES = 35 LARGE_NUMBER_OF_TABLES = 35
SMALL_NUMBER_OF_TABLES = 3 SMALL_NUMBER_OF_TABLES = 3
...@@ -1233,42 +1230,6 @@ class AddFixedDataTask(StateTransitionTask): ...@@ -1233,42 +1230,6 @@ class AddFixedDataTask(StateTransitionTask):
self.activeTable.discard(i) # not raising an error, unlike remove self.activeTable.discard(i) # not raising an error, unlike remove
#---------- Non State-Transition Related Tasks ----------#
class CreateTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tIndex = self._dbState.addTable()
self.logDebug("Creating a table {} ...".format(tIndex))
wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
self.logDebug("Table {} created.".format(tIndex))
self._dbState.releaseTable(tIndex)
class DropTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tableName = self._dbState.getTableNameToDelete()
if ( not tableName ): # May be "False"
self.logInfo("Cannot generate a table to delete, skipping...")
return
self.logInfo("Dropping a table db.{} ...".format(tableName))
wt.execSql("drop table db.{}".format(tableName))
class AddDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState
self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
tIndex = ds.pickAndAllocateTable()
if ( tIndex == None ):
self.logInfo("No table found to add data, skipping...")
return
sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
self.logDebug("[SQL] Executing SQL: {}".format(sql))
wt.execSql(sql)
ds.releaseTable(tIndex)
self.logDebug("[OPS] Finished adding data")
# Deterministic random number generator # Deterministic random number generator
class Dice(): class Dice():
seeded = False # static, uninitialized seeded = False # static, uninitialized
...@@ -1384,12 +1345,12 @@ def main(): ...@@ -1384,12 +1345,12 @@ def main():
# resetDb = False # DEBUG only # resetDb = False # DEBUG only
# dbState = DbState(resetDb) # DBEUG only! # dbState = DbState(resetDb) # DBEUG only!
dbState = DbState() # Regular function dbManager = DbManager() # Regular function
Dice.seed(0) # initial seeding of dice Dice.seed(0) # initial seeding of dice
tc = ThreadCoordinator( tc = ThreadCoordinator(
ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), ThreadPool(gConfig.num_threads, gConfig.max_steps),
# WorkDispatcher(dbState), # Obsolete? # WorkDispatcher(dbState), # Obsolete?
dbState dbManager
) )
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
...@@ -1437,7 +1398,7 @@ def main(): ...@@ -1437,7 +1398,7 @@ def main():
tc.run() tc.run()
tc.logStats() tc.logStats()
dbState.cleanUp() dbManager.cleanUp()
# logger.info("Crash_Gen execution finished") # logger.info("Crash_Gen execution finished")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册