提交 fcbe6c15 编写于 作者: S Steven Li

Adjusted crash_gen to examine returned error codes

上级 b194f47d
......@@ -78,13 +78,22 @@ class WorkerThread:
if ( gConfig.per_thread_db_connection ): # type: ignore
self._dbConn = DbConn()
self._dbInUse = False # if "use db" was executed already
def logDebug(self, msg):
logger.debug(" TRD[{}] {}".format(self._tid, msg))
def logInfo(self, msg):
logger.info(" TRD[{}] {}".format(self._tid, msg))
def dbInUse(self):
return self._dbInUse
def useDb(self):
if ( not self._dbInUse ):
self.execSql("use db")
self._dbInUse = True
def getTaskExecutor(self):
return self._tc.getTaskExecutor()
......@@ -118,12 +127,17 @@ class WorkerThread:
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break
# Fetch a task from the Thread Coordinator
logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask()
# Execute such a task
logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__))
task.execute(self)
tc.saveExecutedTask(task)
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
self._dbInUse = False # there may be changes between steps
def verifyThreadSelf(self): # ensure we are called by this own thread
if ( threading.get_ident() != self._thread.ident ):
......@@ -163,6 +177,18 @@ class WorkerThread:
else:
return self._tc.getDbManager().getDbConn().execute(sql)
def querySql(self, sql): # TODO: expose DbConn directly
if ( gConfig.per_thread_db_connection ):
return self._dbConn.query(sql)
else:
return self._tc.getDbManager().getDbConn().query(sql)
def getQueryResult(self):
if ( gConfig.per_thread_db_connection ):
return self._dbConn.getQueryResult()
else:
return self._tc.getDbManager().getDbConn().getQueryResult()
def getDbConn(self):
if ( gConfig.per_thread_db_connection ):
return self._dbConn
......@@ -176,7 +202,7 @@ class WorkerThread:
# return self._tc.getDbState().getDbConn().query(sql)
class ThreadCoordinator:
def __init__(self, pool, dbManager):
def __init__(self, pool: ThreadPool, dbManager):
self._curStep = -1 # first step is 0
self._pool = pool
# self._wd = wd
......@@ -216,7 +242,16 @@ class ThreadCoordinator:
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
try:
self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state
sm = self._dbManager.getStateMachine()
logger.debug("[STT] starting transitions")
sm.transition(self._executedTasks) # at end of step, transiton the DB state
logger.debug("[STT] transition ended")
if sm.hasDatabase() :
for t in self._pool.threadList:
logger.debug("[DB] use db for all worker threads")
t.useDb()
# t.execSql("use db") # main thread executing "use db" on behalf of every worker thread
except taos.error.ProgrammingError as err:
if ( err.msg == 'network unavailable' ): # broken DB connection
logger.info("DB connection broken, execution failed")
......@@ -268,7 +303,7 @@ class ThreadCoordinator:
wakeSeq.append(i)
else:
wakeSeq.insert(0, i)
logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq)))
logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq)))
# TODO: set dice seed to a deterministic value
for i in wakeSeq:
self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
......@@ -306,7 +341,7 @@ class ThreadPool:
self.maxSteps = maxSteps
# Internal class variables
self.curStep = 0
self.threadList = []
self.threadList = [] # type: List[WorkerThread]
# starting to run all the threads, in locking steps
def createAndStartThreads(self, tc: ThreadCoordinator):
......@@ -412,7 +447,7 @@ class DbConn:
# Get the connection/cursor ready
self._cursor.execute('reset query cache')
# self._cursor.execute('use db') # note we do this in _findCurrenState
# self._cursor.execute('use db') # do this at the beginning of every step
# Open connection
self._tdSql = TDSql()
......@@ -450,7 +485,7 @@ class DbConn:
raise RuntimeError("Cannot query database until connection is open")
logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.query(sql)
logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
# results are in: return self._tdSql.queryResult
......@@ -620,10 +655,10 @@ class StateDbOnly(AnyState):
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
# self.assertAtMostOneSuccess(tasks, DropDbTask)
# self._state = self.STATE_EMPTY
if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
if ( not self.hasTask(tasks, TaskDropSuperTable) ):
self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
# if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
# # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
# if ( not self.hasTask(tasks, TaskDropSuperTable) ):
# self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
# self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail
......@@ -648,7 +683,9 @@ class StateSuperTableOnly(AnyState):
def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table
self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
#self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it
# self._state = self.STATE_DB_ONLY
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
# self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
......@@ -692,7 +729,7 @@ class StateHasData(AnyState):
self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it
# self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
class StateMechine :
class StateMechine:
def __init__(self, dbConn):
self._dbConn = dbConn
self._curState = self._findCurrentState() # starting state
......@@ -701,8 +738,17 @@ class StateMechine :
def getCurrentState(self):
return self._curState
def hasDatabase(self):
return self._curState.canDropDb() # ha, can drop DB means it has one
# May be slow, use cautionsly...
def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
def typesToStrings(types):
ss = []
for t in types:
ss.append(t.__name__)
return ss
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
firstTaskTypes = []
for tc in allTaskClasses:
......@@ -721,7 +767,7 @@ class StateMechine :
if len(taskTypes) <= 0:
raise RuntimeError("No suitable task types found for state: {}".format(self._curState))
logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes))
logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes)))
return taskTypes
def _findCurrentState(self):
......@@ -731,7 +777,7 @@ class StateMechine :
# logger.debug("Found EMPTY state")
logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
return StateEmpty()
dbc.execute("use db") # did not do this when openning connection
dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own
if dbc.query("show tables") == 0 : # no tables
# logger.debug("Found DB ONLY state")
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
......@@ -747,6 +793,7 @@ class StateMechine :
def transition(self, tasks):
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
logger.debug("[STT] Starting State: {}".format(self._curState))
return # do nothing
self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
......@@ -830,7 +877,7 @@ class DbManager():
def getDbConn(self):
return self._dbConn
def getStateMachine(self):
def getStateMachine(self) -> StateMechine :
return self._stateMachine
# def getState(self):
......@@ -931,6 +978,7 @@ class Task():
# logger.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats
self._lastSql = "" # last SQL executed/attempted
def isSuccess(self):
return self._err == None
......@@ -961,10 +1009,16 @@ class Task():
try:
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err))
self._err = err
errno2 = 0x80000000 + err.errno # positive error number
if ( errno2 in [0x200, 0x360, 0x362, 0x381, 0x380, 0x600 ]) : # allowed errors
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
print("e", end="", flush=True)
self._err = err
else:
self.logDebug("[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
raise
except:
self.logDebug("[=] Unexpected exception")
self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql))
raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
......@@ -972,8 +1026,21 @@ class Task():
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
def execSql(self, sql):
self._lastSql = sql
return self._dbManager.execute(sql)
def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
self._lastSql = sql
return wt.execSql(sql)
def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
self._lastSql = sql
return wt.querySql(sql)
def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
return wt.getQueryResult()
class ExecutionStats:
def __init__(self):
......@@ -1039,6 +1106,11 @@ class ExecutionStats:
class StateTransitionTask(Task):
LARGE_NUMBER_OF_TABLES = 35
SMALL_NUMBER_OF_TABLES = 3
LARGE_NUMBER_OF_RECORDS = 50
SMALL_NUMBER_OF_RECORDS = 3
@classmethod
def getInfo(cls): # each sub class should supply their own information
raise RuntimeError("Overriding method expected")
......@@ -1061,6 +1133,10 @@ class StateTransitionTask(Task):
# return state.getValue() in cls.getBeginStates()
raise RuntimeError("must be overriden")
@classmethod
def getRegTableName(cls, i):
return "db.reg_table_{}".format(i)
def execute(self, wt: WorkerThread):
super().execute(wt)
......@@ -1074,7 +1150,7 @@ class TaskCreateDb(StateTransitionTask):
return state.canCreateDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("create database db")
self.execWtSql(wt, "create database db")
class TaskDropDb(StateTransitionTask):
@classmethod
......@@ -1086,7 +1162,7 @@ class TaskDropDb(StateTransitionTask):
return state.canDropDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("drop database db")
self.execWtSql(wt, "drop database db")
logger.debug("[OPS] database dropped at {}".format(time.time()))
class TaskCreateSuperTable(StateTransitionTask):
......@@ -1099,8 +1175,13 @@ class TaskCreateSuperTable(StateTransitionTask):
return state.canCreateFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbManager.getFixedSuperTableName()
wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
if not wt.dbInUse(): # no DB yet, to the best of our knowledge
logger.debug("Skipping task, no DB yet")
return
tblName = self._dbManager.getFixedSuperTableName()
# wt.execSql("use db") # should always be in place
self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that automatically
......@@ -1115,16 +1196,16 @@ class TaskReadData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTbName = self._dbManager.getFixedSuperTableName()
dbc = wt.getDbConn()
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
dbc.close()
dbc.open()
wt.getDbConn().close()
wt.getDbConn().open()
else:
rTables = dbc.getQueryResult()
rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult()
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
for rTbName in rTables : # regular tables
dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
self.execWtSql(wt, "select * from db.{}".format(rTbName[0]))
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
......@@ -1138,8 +1219,31 @@ class TaskDropSuperTable(StateTransitionTask):
return state.canDropFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbManager.getFixedSuperTableName()
wt.execSql("drop table db.{}".format(tblName))
# 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
if Dice.throw(2) == 0 :
tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
random.shuffle(tblSeq)
tickOutput = False # if we have spitted out a "d" character for "drop regular table"
for i in tblSeq:
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
try:
nRows = self.execWtSql(wt, "drop table {}".format(regTableName))
except taos.error.ProgrammingError as err:
errno2 = 0x80000000 + err.errno # positive error number
if ( errno2 in [0x362]) : # allowed errors
logger.debug("[DB] Acceptable error when dropping a table")
continue
if (not tickOutput):
tickOutput = True # Print only one time
if nRows >= 1 :
print("d", end="", flush=True)
else:
print("f({})".format(nRows), end="", flush=True)
# Drop the super table itself
tblName = self._dbManager.getFixedSuperTableName()
self.execWtSql(wt, "drop table db.{}".format(tblName))
class TaskAlterTags(StateTransitionTask):
@classmethod
......@@ -1154,20 +1258,18 @@ class TaskAlterTags(StateTransitionTask):
tblName = self._dbManager.getFixedSuperTableName()
dice = Dice.throw(4)
if dice == 0 :
wt.execSql("alter table db.{} add tag extraTag int".format(tblName))
sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1 :
wt.execSql("alter table db.{} drop tag extraTag".format(tblName))
sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2 :
wt.execSql("alter table db.{} drop tag newTag".format(tblName))
sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3
wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName))
sql = "alter table db.{} change tag extraTag newTag".format(tblName)
self.execWtSql(wt, sql)
class TaskAddData(StateTransitionTask):
activeTable : Set[int] = set() # Track which table is being actively worked on
LARGE_NUMBER_OF_TABLES = 35
SMALL_NUMBER_OF_TABLES = 3
LARGE_NUMBER_OF_RECORDS = 50
SMALL_NUMBER_OF_RECORDS = 3
# We use these two files to record operations to DB, useful for power-off tests
fAddLogReady = None
......@@ -1193,7 +1295,7 @@ class TaskAddData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbManager
wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
# wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
random.shuffle(tblSeq)
for i in tblSeq:
......@@ -1203,10 +1305,10 @@ class TaskAddData(StateTransitionTask):
print("x", end="", flush=True)
else:
self.activeTable.add(i) # marking it active
# No need to shuffle data sequence, unless later we decide to do non-increment insertion
# No need to shuffle data sequence, unless later we decide to do non-increment insertion
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table
nextInt = ds.getNextInt()
regTableName = "db.reg_table_{}".format(i)
nextInt = ds.getNextInt()
if gConfig.record_ops:
self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
......@@ -1217,7 +1319,7 @@ class TaskAddData(StateTransitionTask):
ds.getFixedSuperTableName(),
ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), nextInt)
wt.execSql(sql)
self.execWtSql(wt, sql)
if gConfig.record_ops:
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.flush()
......@@ -1390,7 +1492,7 @@ def main():
# if len(sys.argv) == 1:
# parser.print_help()
# sys.exit()
global logger
logger = logging.getLogger('CrashGen')
logger.addFilter(LoggingFilter())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册