diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index c88683aa097c82380503370d342101351b27d38b..bff2403bb8b00537fb25ceab93ddf34263dc1c17 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -14,6 +14,7 @@ from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel import sys +import os import traceback # Require Python 3 if sys.version_info[0] < 3: @@ -32,6 +33,7 @@ import textwrap from typing import List from typing import Dict +from typing import Set from util.log import * from util.dnodes import * @@ -42,7 +44,10 @@ import crash_gen import taos # Global variables, tried to keep a small number. -gConfig = None # Command-line/Environment Configurations, will set a bit later + +# Command-line/Environment Configurations, will set a bit later +# ConfigNameSpace = argparse.Namespace +gConfig = argparse.Namespace() # Dummy value, will be replaced later logger = None def runThread(wt: WorkerThread): @@ -64,7 +69,7 @@ class WorkerThread: # self._curStep = -1 self._pool = pool self._tid = tid - self._tc = tc + self._tc = tc # type: ThreadCoordinator # self.threadIdent = threading.get_ident() self._thread = threading.Thread(target=runThread, args=(self,)) self._stepGate = threading.Event() @@ -156,13 +161,13 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): return self._dbConn.execute(sql) else: - return self._tc.getDbState().getDbConn().execute(sql) + return self._tc.getDbManager().getDbConn().execute(sql) def getDbConn(self): if ( gConfig.per_thread_db_connection ): return self._dbConn else: - return self._tc.getDbState().getDbConn() + return self._tc.getDbManager().getDbConn() # def querySql(self, sql): # not "execute", since we are out side the DB context # if ( gConfig.per_thread_db_connection ): @@ -171,12 +176,12 @@ class WorkerThread: # return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: - def __init__(self, pool, dbState): + def __init__(self, pool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd self._te = None # prepare for every new step - self._dbState = dbState + self._dbManager = dbManager self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things @@ -186,8 +191,8 @@ class ThreadCoordinator: def getTaskExecutor(self): return self._te - def getDbState(self) -> DbState : - return self._dbState + def getDbManager(self) -> DbManager : + return self._dbManager def crossStepBarrier(self): self._stepBarrier.wait() @@ -211,7 +216,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + self._dbManager.transition(self._executedTasks) # at end of step, transiton the DB state except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -284,8 +289,8 @@ class ThreadCoordinator: # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # return tasks[i].clone() # TODO: still necessary? - taskType = self.getDbState().pickTaskType() # pick a task type for current state - return taskType(self.getDbState(), self._execStats) # create a task from it + taskType = self.getDbManager().pickTaskType() # pick a task type for current state + return taskType(self.getDbManager(), self._execStats) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -296,16 +301,12 @@ class ThreadCoordinator: # We define a class to run a number of threads in locking steps. class ThreadPool: - def __init__(self, dbState, numThreads, maxSteps, funcSequencer): + def __init__(self, numThreads, maxSteps): self.numThreads = numThreads self.maxSteps = maxSteps - self.funcSequencer = funcSequencer # Internal class variables - # self.dispatcher = WorkDispatcher(dbState) # Obsolete? self.curStep = 0 self.threadList = [] - # self.stepGate = threading.Condition() # Gate to hold/sync all threads - # self.numWaitingThreads = 0 # starting to run all the threads, in locking steps def createAndStartThreads(self, tc: ThreadCoordinator): @@ -319,7 +320,8 @@ class ThreadPool: logger.debug("Joining thread...") workerThread._thread.join() -# A queue of continguous POSITIVE integers +# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers +# for new table names class LinearQueue(): def __init__(self): self.firstIndex = 1 # 1st ever element @@ -595,9 +597,9 @@ class StateEmpty(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB - if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks - self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers + if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB + if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks + self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers class StateDbOnly(AnyState): def getInfo(self): @@ -609,20 +611,20 @@ class StateDbOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( not self.hasTask(tasks, CreateDbTask) ): - self.assertAtMostOneSuccess(tasks, DropDbTask) # only if we don't create any more - self.assertIfExistThenSuccess(tasks, DropDbTask) + if ( not self.hasTask(tasks, TaskCreateDb) ): + self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more + self.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # Nothing to be said about adding data task - if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess - self.assertAtMostOneSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - if ( not self.hasTask(tasks, DropFixedSuperTableTask) ): - self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything - self.assertNoTask(tasks, DropDbTask) # should have have tried + if ( not self.hasTask(tasks, TaskDropSuperTable) ): + self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything + # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail # self._state = self.STATE_TABLE_ONLY @@ -645,8 +647,8 @@ class StateSuperTableOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) + if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table + self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) # self._state = self.STATE_DB_ONLY # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases @@ -670,26 +672,28 @@ class StateHasData(AnyState): def verifyTasksToState(self, tasks, newState): if ( newState.equals(AnyState.STATE_EMPTY) ): - self.hasSuccess(tasks, DropDbTask) - self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + self.hasSuccess(tasks, TaskDropDb) + if ( not self.hasTask(tasks, TaskCreateDb) ) : + self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only - if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task - self.assertNoTask(tasks, DropDbTask) # we must have drop_db task - self.hasSuccess(tasks, DropFixedSuperTableTask) + if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task + self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task + self.hasSuccess(tasks, TaskDropSuperTable) # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted - self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) - self.assertNoTask(tasks, AddFixedDataTask) + self.assertNoTask(tasks, TaskDropDb) + self.assertNoTask(tasks, TaskDropSuperTable) + self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) - else: - self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) - self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) + else: # should be STATE_HAS_DATA + self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table + self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it + # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) -# State of the database as we believe it to be -class DbState(): +# Manager of the Database Data/Connection +class DbManager(): def __init__(self, resetDb = True): self.tableNumQueue = LinearQueue() @@ -872,11 +876,11 @@ class DbState(): # Generic Checks, first based on the start state if self._state.canCreateDb(): - self._state.assertIfExistThenSuccess(tasks, CreateDbTask) + self._state.assertIfExistThenSuccess(tasks, TaskCreateDb) # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops if self._state.canDropDb(): - self._state.assertIfExistThenSuccess(tasks, DropDbTask) + self._state.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop # if self._state.canCreateFixedTable(): @@ -923,8 +927,8 @@ class Task(): # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) return Task.taskSn - def __init__(self, dbState: DbState, execStats: ExecutionStats): - self._dbState = dbState + def __init__(self, dbManager: DbManager, execStats: ExecutionStats): + self._dbState = dbManager self._workerThread = None self._err = None self._curStep = None @@ -966,7 +970,7 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err)) - self._err = err + self._err = err except: self.logDebug("[=] Unexpected exception") raise @@ -1068,7 +1072,7 @@ class StateTransitionTask(Task): -class CreateDbTask(StateTransitionTask): +class TaskCreateDb(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1083,7 +1087,7 @@ class CreateDbTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("create database db") -class DropDbTask(StateTransitionTask): +class TaskDropDb(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1099,7 +1103,7 @@ class DropDbTask(StateTransitionTask): wt.execSql("drop database db") logger.debug("[OPS] database dropped at {}".format(time.time())) -class CreateFixedSuperTableTask(StateTransitionTask): +class TaskCreateSuperTable(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1117,7 +1121,7 @@ class CreateFixedSuperTableTask(StateTransitionTask): # No need to create the regular tables, INSERT will do that automatically -class ReadFixedDataTask(StateTransitionTask): +class TaskReadData(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1133,14 +1137,18 @@ class ReadFixedDataTask(StateTransitionTask): sTbName = self._dbState.getFixedSuperTableName() dbc = wt.getDbConn() dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later - rTables = dbc.getQueryResult() - # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - for rTbName in rTables : # regular tables - dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations + dbc.close() + dbc.open() + else: + rTables = dbc.getQueryResult() + # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + for rTbName in rTables : # regular tables + dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure # tdSql.query(" cars where tbname in ('carzero', 'carone')") -class DropFixedSuperTableTask(StateTransitionTask): +class TaskDropSuperTable(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1156,7 +1164,27 @@ class DropFixedSuperTableTask(StateTransitionTask): tblName = self._dbState.getFixedSuperTableName() wt.execSql("drop table db.{}".format(tblName)) -class AddFixedDataTask(StateTransitionTask): +class TaskAddData(StateTransitionTask): + activeTable : Set[int] = set() # Track which table is being actively worked on + LARGE_NUMBER_OF_TABLES = 35 + SMALL_NUMBER_OF_TABLES = 3 + LARGE_NUMBER_OF_RECORDS = 50 + SMALL_NUMBER_OF_RECORDS = 3 + + # We use these two files to record operations to DB, useful for power-off tests + fAddLogReady = None + fAddLogDone = None + + @classmethod + def prepToRecordOps(cls): + if gConfig.record_ops : + if ( cls.fAddLogReady == None ): + logger.info("Recording in a file operations to be performed...") + cls.fAddLogReady = open("add_log_ready.txt", "w") + if ( cls.fAddLogDone == None ): + logger.info("Recording in a file operations completed...") + cls.fAddLogDone = open("add_log_done.txt", "w") + @classmethod def getInfo(cls): return [ @@ -1171,50 +1199,35 @@ class AddFixedDataTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState wt.execSql("use db") # TODO: seems to be an INSERT bug to require this - for i in range(10): # 0 to 9 - for j in range(10) : - sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( - i, + tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) + random.shuffle(tblSeq) + for i in tblSeq: + if ( i in self.activeTable ): # wow already active + # logger.info("Concurrent data insertion into table: {}".format(i)) + # print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table + print("x", end="", flush=True) + else: + self.activeTable.add(i) # marking it active + # No need to shuffle data sequence, unless later we decide to do non-increment insertion + for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table + nextInt = ds.getNextInt() + regTableName = "db.reg_table_{}".format(i) + if gConfig.record_ops: + self.prepToRecordOps() + self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) + self.fAddLogReady.flush() + os.fsync(self.fAddLogReady) + sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format( + regTableName, ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), - ds.getNextTick(), ds.getNextInt()) + ds.getNextTick(), nextInt) wt.execSql(sql) - - -#---------- Non State-Transition Related Tasks ----------# - -class CreateTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tIndex = self._dbState.addTable() - self.logDebug("Creating a table {} ...".format(tIndex)) - wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - self.logDebug("Table {} created.".format(tIndex)) - self._dbState.releaseTable(tIndex) - -class DropTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tableName = self._dbState.getTableNameToDelete() - if ( not tableName ): # May be "False" - self.logInfo("Cannot generate a table to delete, skipping...") - return - self.logInfo("Dropping a table db.{} ...".format(tableName)) - wt.execSql("drop table db.{}".format(tableName)) - - - -class AddDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState - self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) - tIndex = ds.pickAndAllocateTable() - if ( tIndex == None ): - self.logInfo("No table found to add data, skipping...") - return - sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - self.logDebug("[SQL] Executing SQL: {}".format(sql)) - wt.execSql(sql) - ds.releaseTable(tIndex) - self.logDebug("[OPS] Finished adding data") + if gConfig.record_ops: + self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) + self.fAddLogDone.flush() + os.fsync(self.fAddLogDone) + self.activeTable.discard(i) # not raising an error, unlike remove # Deterministic random number generator @@ -1301,10 +1314,14 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', - help='Use a single shared db connection (default: false)') parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-l', '--larger-data', action='store_true', + help='Write larger amount of data during write operations (default: false)') + parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + help='Use a single shared db connection (default: false)') + parser.add_argument('-r', '--record-ops', action='store_true', + help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, help='Maximum number of steps to run (default: 100)') parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, @@ -1328,12 +1345,12 @@ def main(): # resetDb = False # DEBUG only # dbState = DbState(resetDb) # DBEUG only! - dbState = DbState() # Regular function + dbManager = DbManager() # Regular function Dice.seed(0) # initial seeding of dice tc = ThreadCoordinator( - ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), + ThreadPool(gConfig.num_threads, gConfig.max_steps), # WorkDispatcher(dbState), # Obsolete? - dbState + dbManager ) # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix @@ -1381,7 +1398,7 @@ def main(): tc.run() tc.logStats() - dbState.cleanUp() + dbManager.cleanUp() # logger.info("Crash_Gen execution finished")