diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 916e8904ff6f0f7a9f0cbbb306ee5823d6281334..94ad63697cc6dbc946ece1c0c34b839ea3a001c7 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -78,13 +78,22 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): # type: ignore self._dbConn = DbConn() + self._dbInUse = False # if "use db" was executed already + def logDebug(self, msg): logger.debug(" TRD[{}] {}".format(self._tid, msg)) def logInfo(self, msg): logger.info(" TRD[{}] {}".format(self._tid, msg)) - + def dbInUse(self): + return self._dbInUse + + def useDb(self): + if ( not self._dbInUse ): + self.execSql("use db") + self._dbInUse = True + def getTaskExecutor(self): return self._tc.getTaskExecutor() @@ -118,12 +127,17 @@ class WorkerThread: logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + # Fetch a task from the Thread Coordinator logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() + + # Execute such a task logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) + + self._dbInUse = False # there may be changes between steps def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): @@ -163,6 +177,18 @@ class WorkerThread: else: return self._tc.getDbManager().getDbConn().execute(sql) + def querySql(self, sql): # TODO: expose DbConn directly + if ( gConfig.per_thread_db_connection ): + return self._dbConn.query(sql) + else: + return self._tc.getDbManager().getDbConn().query(sql) + + def getQueryResult(self): + if ( gConfig.per_thread_db_connection ): + return self._dbConn.getQueryResult() + else: + return self._tc.getDbManager().getDbConn().getQueryResult() + def getDbConn(self): if ( gConfig.per_thread_db_connection ): return self._dbConn @@ -176,7 +202,7 @@ class WorkerThread: # return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: - def __init__(self, pool, dbManager): + def __init__(self, pool: ThreadPool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd @@ -216,7 +242,16 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state + sm = self._dbManager.getStateMachine() + logger.debug("[STT] starting transitions") + sm.transition(self._executedTasks) # at end of step, transiton the DB state + logger.debug("[STT] transition ended") + if sm.hasDatabase() : + for t in self._pool.threadList: + logger.debug("[DB] use db for all worker threads") + t.useDb() + # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread + except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -268,7 +303,7 @@ class ThreadCoordinator: wakeSeq.append(i) else: wakeSeq.insert(0, i) - logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq))) + logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq))) # TODO: set dice seed to a deterministic value for i in wakeSeq: self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! @@ -306,7 +341,7 @@ class ThreadPool: self.maxSteps = maxSteps # Internal class variables self.curStep = 0 - self.threadList = [] + self.threadList = [] # type: List[WorkerThread] # starting to run all the threads, in locking steps def createAndStartThreads(self, tc: ThreadCoordinator): @@ -412,7 +447,7 @@ class DbConn: # Get the connection/cursor ready self._cursor.execute('reset query cache') - # self._cursor.execute('use db') # note we do this in _findCurrenState + # self._cursor.execute('use db') # do this at the beginning of every step # Open connection self._tdSql = TDSql() @@ -450,7 +485,7 @@ class DbConn: raise RuntimeError("Cannot query database until connection is open") logger.debug("[SQL] Executing SQL: {}".format(sql)) nRows = self._tdSql.query(sql) - logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql)) return nRows # results are in: return self._tdSql.queryResult @@ -620,10 +655,10 @@ class StateDbOnly(AnyState): # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success - # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - if ( not self.hasTask(tasks, TaskDropSuperTable) ): - self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything + # if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success + # # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table + # if ( not self.hasTask(tasks, TaskDropSuperTable) ): + # self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail @@ -648,7 +683,9 @@ class StateSuperTableOnly(AnyState): def verifyTasksToState(self, tasks, newState): if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) + #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) + self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it + # self._state = self.STATE_DB_ONLY # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases @@ -692,7 +729,7 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) -class StateMechine : +class StateMechine: def __init__(self, dbConn): self._dbConn = dbConn self._curState = self._findCurrentState() # starting state @@ -701,8 +738,17 @@ class StateMechine : def getCurrentState(self): return self._curState + def hasDatabase(self): + return self._curState.canDropDb() # ha, can drop DB means it has one + # May be slow, use cautionsly... def getTaskTypes(self): # those that can run (directly/indirectly) from the current state + def typesToStrings(types): + ss = [] + for t in types: + ss.append(t.__name__) + return ss + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks firstTaskTypes = [] for tc in allTaskClasses: @@ -721,7 +767,7 @@ class StateMechine : if len(taskTypes) <= 0: raise RuntimeError("No suitable task types found for state: {}".format(self._curState)) - logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes)) + logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes))) return taskTypes def _findCurrentState(self): @@ -731,7 +777,7 @@ class StateMechine : # logger.debug("Found EMPTY state") logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) return StateEmpty() - dbc.execute("use db") # did not do this when openning connection + dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own if dbc.query("show tables") == 0 : # no tables # logger.debug("Found DB ONLY state") logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) @@ -747,6 +793,7 @@ class StateMechine : def transition(self, tasks): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty + logger.debug("[STT] Starting State: {}".format(self._curState)) return # do nothing self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps @@ -830,7 +877,7 @@ class DbManager(): def getDbConn(self): return self._dbConn - def getStateMachine(self): + def getStateMachine(self) -> StateMechine : return self._stateMachine # def getState(self): @@ -931,6 +978,7 @@ class Task(): # logger.debug("Creating new task {}...".format(self._taskNum)) self._execStats = execStats + self._lastSql = "" # last SQL executed/attempted def isSuccess(self): return self._err == None @@ -961,10 +1009,16 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err)) - self._err = err + errno2 = 0x80000000 + err.errno # positive error number + if ( errno2 in [0x200, 0x360, 0x362, 0x381, 0x380, 0x600 ]) : # allowed errors + self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) + print("e", end="", flush=True) + self._err = err + else: + self.logDebug("[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) + raise except: - self.logDebug("[=] Unexpected exception") + self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql)) raise self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) @@ -972,8 +1026,21 @@ class Task(): self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): + self._lastSql = sql return self._dbManager.execute(sql) + def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread + self._lastSql = sql + return wt.execSql(sql) + + def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread + self._lastSql = sql + return wt.querySql(sql) + + def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread + return wt.getQueryResult() + + class ExecutionStats: def __init__(self): @@ -1039,6 +1106,11 @@ class ExecutionStats: class StateTransitionTask(Task): + LARGE_NUMBER_OF_TABLES = 35 + SMALL_NUMBER_OF_TABLES = 3 + LARGE_NUMBER_OF_RECORDS = 50 + SMALL_NUMBER_OF_RECORDS = 3 + @classmethod def getInfo(cls): # each sub class should supply their own information raise RuntimeError("Overriding method expected") @@ -1061,6 +1133,10 @@ class StateTransitionTask(Task): # return state.getValue() in cls.getBeginStates() raise RuntimeError("must be overriden") + @classmethod + def getRegTableName(cls, i): + return "db.reg_table_{}".format(i) + def execute(self, wt: WorkerThread): super().execute(wt) @@ -1074,7 +1150,7 @@ class TaskCreateDb(StateTransitionTask): return state.canCreateDb() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - wt.execSql("create database db") + self.execWtSql(wt, "create database db") class TaskDropDb(StateTransitionTask): @classmethod @@ -1086,7 +1162,7 @@ class TaskDropDb(StateTransitionTask): return state.canDropDb() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - wt.execSql("drop database db") + self.execWtSql(wt, "drop database db") logger.debug("[OPS] database dropped at {}".format(time.time())) class TaskCreateSuperTable(StateTransitionTask): @@ -1099,8 +1175,13 @@ class TaskCreateSuperTable(StateTransitionTask): return state.canCreateFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbManager.getFixedSuperTableName() - wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) + if not wt.dbInUse(): # no DB yet, to the best of our knowledge + logger.debug("Skipping task, no DB yet") + return + + tblName = self._dbManager.getFixedSuperTableName() + # wt.execSql("use db") # should always be in place + self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that automatically @@ -1115,16 +1196,16 @@ class TaskReadData(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): sTbName = self._dbManager.getFixedSuperTableName() - dbc = wt.getDbConn() - dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations - dbc.close() - dbc.open() + wt.getDbConn().close() + wt.getDbConn().open() else: - rTables = dbc.getQueryResult() + rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult() # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) for rTbName in rTables : # regular tables - dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + self.execWtSql(wt, "select * from db.{}".format(rTbName[0])) # tdSql.query(" cars where tbname in ('carzero', 'carone')") @@ -1138,8 +1219,31 @@ class TaskDropSuperTable(StateTransitionTask): return state.canDropFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbManager.getFixedSuperTableName() - wt.execSql("drop table db.{}".format(tblName)) + # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence + if Dice.throw(2) == 0 : + tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) + random.shuffle(tblSeq) + tickOutput = False # if we have spitted out a "d" character for "drop regular table" + for i in tblSeq: + regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) + try: + nRows = self.execWtSql(wt, "drop table {}".format(regTableName)) + except taos.error.ProgrammingError as err: + errno2 = 0x80000000 + err.errno # positive error number + if ( errno2 in [0x362]) : # allowed errors + logger.debug("[DB] Acceptable error when dropping a table") + continue + + if (not tickOutput): + tickOutput = True # Print only one time + if nRows >= 1 : + print("d", end="", flush=True) + else: + print("f({})".format(nRows), end="", flush=True) + + # Drop the super table itself + tblName = self._dbManager.getFixedSuperTableName() + self.execWtSql(wt, "drop table db.{}".format(tblName)) class TaskAlterTags(StateTransitionTask): @classmethod @@ -1154,20 +1258,18 @@ class TaskAlterTags(StateTransitionTask): tblName = self._dbManager.getFixedSuperTableName() dice = Dice.throw(4) if dice == 0 : - wt.execSql("alter table db.{} add tag extraTag int".format(tblName)) + sql = "alter table db.{} add tag extraTag int".format(tblName) elif dice == 1 : - wt.execSql("alter table db.{} drop tag extraTag".format(tblName)) + sql = "alter table db.{} drop tag extraTag".format(tblName) elif dice == 2 : - wt.execSql("alter table db.{} drop tag newTag".format(tblName)) + sql = "alter table db.{} drop tag newTag".format(tblName) else: # dice == 3 - wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName)) + sql = "alter table db.{} change tag extraTag newTag".format(tblName) + + self.execWtSql(wt, sql) class TaskAddData(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on - LARGE_NUMBER_OF_TABLES = 35 - SMALL_NUMBER_OF_TABLES = 3 - LARGE_NUMBER_OF_RECORDS = 50 - SMALL_NUMBER_OF_RECORDS = 3 # We use these two files to record operations to DB, useful for power-off tests fAddLogReady = None @@ -1193,7 +1295,7 @@ class TaskAddData(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbManager - wt.execSql("use db") # TODO: seems to be an INSERT bug to require this + # wt.execSql("use db") # TODO: seems to be an INSERT bug to require this tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) for i in tblSeq: @@ -1203,10 +1305,10 @@ class TaskAddData(StateTransitionTask): print("x", end="", flush=True) else: self.activeTable.add(i) # marking it active - # No need to shuffle data sequence, unless later we decide to do non-increment insertion + # No need to shuffle data sequence, unless later we decide to do non-increment insertion + regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table - nextInt = ds.getNextInt() - regTableName = "db.reg_table_{}".format(i) + nextInt = ds.getNextInt() if gConfig.record_ops: self.prepToRecordOps() self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) @@ -1217,7 +1319,7 @@ class TaskAddData(StateTransitionTask): ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), nextInt) - wt.execSql(sql) + self.execWtSql(wt, sql) if gConfig.record_ops: self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.flush() @@ -1390,7 +1492,7 @@ def main(): # if len(sys.argv) == 1: # parser.print_help() # sys.exit() - + global logger logger = logging.getLogger('CrashGen') logger.addFilter(LoggingFilter())