From ea0af95df24b424d6ab4d047ba3a8f8d68f02e9e Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 15 Sep 2020 09:04:00 +0000 Subject: [PATCH] Enhanced crash_gen tool to test against multiple databases concurrently, getting ready to test against clusters --- tests/pytest/crash_gen.py | 516 ++++++++++++++++++++++---------------- 1 file changed, 300 insertions(+), 216 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 1ea19dfac3..32290ab87f 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -53,14 +53,13 @@ except: if sys.version_info[0] < 3: raise Exception("Must be using Python 3") - # Global variables, tried to keep a small number. # Command-line/Environment Configurations, will set a bit later # ConfigNameSpace = argparse.Namespace gConfig = argparse.Namespace() # Dummy value, will be replaced later gSvcMgr = None # TODO: refactor this hack, use dep injection -logger = None +logger = None # type: Logger def runThread(wt: WorkerThread): wt.run() @@ -101,7 +100,7 @@ class WorkerThread: else: raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) - self._dbInUse = False # if "use db" was executed already + # self._dbInUse = False # if "use db" was executed already def logDebug(self, msg): logger.debug(" TRD[{}] {}".format(self._tid, msg)) @@ -109,13 +108,13 @@ class WorkerThread: def logInfo(self, msg): logger.info(" TRD[{}] {}".format(self._tid, msg)) - def dbInUse(self): - return self._dbInUse + # def dbInUse(self): + # return self._dbInUse - def useDb(self): - if (not self._dbInUse): - self.execSql("use db") - self._dbInUse = True + # def useDb(self): + # if (not self._dbInUse): + # self.execSql("use db") + # self._dbInUse = True def getTaskExecutor(self): return self._tc.getTaskExecutor() @@ -161,12 +160,12 @@ class WorkerThread: logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break - # Before we fetch the task and run it, let's ensure we properly "use" the database + # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) try: if (gConfig.per_thread_db_connection): # most likely TRUE if not self._dbConn.isOpen: # might have been closed during server auto-restart self._dbConn.open() - self.useDb() # might encounter exceptions. TODO: catch + # self.useDb() # might encounter exceptions. TODO: catch except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready @@ -181,14 +180,13 @@ class WorkerThread: task = tc.fetchTask() # Execute such a task - logger.debug( - "[TRD] Worker thread [{}] about to execute task: {}".format( + logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format( self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) - self._dbInUse = False # there may be changes between steps + # self._dbInUse = False # there may be changes between steps # print("_wtd", end=None) # worker thread died def verifyThreadSelf(self): # ensure we are called by this own thread @@ -237,7 +235,7 @@ class WorkerThread: def getQueryResult(self): return self.getDbConn().getQueryResult() - def getDbConn(self): + def getDbConn(self) -> DbConn : if (gConfig.per_thread_db_connection): return self._dbConn else: @@ -255,7 +253,7 @@ class WorkerThread: class ThreadCoordinator: WORKER_THREAD_TIMEOUT = 60 # one minute - def __init__(self, pool: ThreadPool, dbManager): + def __init__(self, pool: ThreadPool, dbManager: DbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd @@ -268,6 +266,7 @@ class ThreadCoordinator: self._pool.numThreads + 1) # one barrier for all threads self._execStats = ExecutionStats() self._runStatus = MainExec.STATUS_RUNNING + self._initDbs() def getTaskExecutor(self): return self._te @@ -332,12 +331,16 @@ class ThreadCoordinator: def _doTransition(self): transitionFailed = False try: - sm = self._dbManager.getStateMachine() - logger.debug("[STT] starting transitions") - # at end of step, transiton the DB state - sm.transition(self._executedTasks) - logger.debug("[STT] transition ended") - # Due to limitation (or maybe not) of the Python library, + for x in self._dbs: + db = x # type: Database + sm = db.getStateMachine() + logger.debug("[STT] starting transitions for DB: {}".format(db.getName())) + # at end of step, transiton the DB state + tasksForDb = db.filterTasks(self._executedTasks) + sm.transition(tasksForDb, self.getDbManager().getDbConn()) + logger.debug("[STT] transition ended for DB: {}".format(db.getName())) + + # Due to limitation (or maybe not) of the TD Python library, # we cannot share connections across threads # Here we are in main thread, we cannot operate the connections created in workers # Moving below to task loop @@ -347,6 +350,7 @@ class ThreadCoordinator: # t.useDb() # t.execSql("use db") # main thread executing "use # db" on behalf of every worker thread + except taos.error.ProgrammingError as err: if (err.msg == 'network unavailable'): # broken DB connection logger.info("DB connection broken, execution failed") @@ -458,23 +462,34 @@ class ThreadCoordinator: def isRunning(self): return self._te is not None + def _initDbs(self): + ''' Initialize multiple databases, invoked at __ini__() time ''' + self._dbs = [] # type: List[Database] + dbc = self.getDbManager().getDbConn() + if gConfig.max_dbs == 0: + self._dbs.append(Database(0, dbc)) + else: + for i in range(gConfig.max_dbs): + self._dbs.append(Database(i, dbc)) + + def pickDatabase(self): + idxDb = 0 + if gConfig.max_dbs != 0 : + idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1 + db = self._dbs[idxDb] # type: Database + return db + def fetchTask(self) -> Task: + ''' The thread coordinator (that's us) is responsible for fetching a task + to be executed next. + ''' if (not self.isRunning()): # no task raise RuntimeError("Cannot fetch task when not running") - # return self._wd.pickTask() - # Alternatively, let's ask the DbState for the appropriate task - # dbState = self.getDbState() - # tasks = dbState.getTasksAtState() # TODO: create every time? - # nTasks = len(tasks) - # i = Dice.throw(nTasks) - # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) - # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. - # return tasks[i].clone() # TODO: still necessary? + # pick a task type for current state - taskType = self.getDbManager().getStateMachine().pickTaskType() - return taskType( - self.getDbManager(), - self._execStats) # create a task from it + db = self.pickDatabase() + taskType = db.getStateMachine().pickTaskType() # type: Task + return taskType(self._execStats, db) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -632,17 +647,6 @@ class DbConn: logger.debug("[DB] data connection opened, type = {}".format(self._type)) self.isOpen = True - def resetDb(self): # reset the whole database, etc. - if (not self.isOpen): - raise RuntimeError("Cannot reset database until connection is open") - # self._tdSql.prepare() # Recreate database, etc. - - self.execute('drop database if exists db') - logger.debug("Resetting DB, dropped database") - # self._cursor.execute('create database db') - # self._cursor.execute('use db') - # tdSql.execute('show databases') - def queryScalar(self, sql) -> int: return self._queryAny(sql) @@ -662,16 +666,32 @@ class DbConn: def use(self, dbName): self.execute("use {}".format(dbName)) - def hasDatabases(self): - return self.query("show databases") > 1 # We now have a "log" database by default + def existsDatabase(self, dbName: str): + ''' Check if a certain database exists ''' + self.query("show databases") + dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation + # ret2 = dbName in dbs + # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName))) + return dbName in dbs # TODO: super weird type mangling seen, once here def hasTables(self): return self.query("show tables") > 0 def execute(self, sql): + ''' Return the number of rows affected''' raise RuntimeError("Unexpected execution, should be overriden") + def safeExecute(self, sql): + '''Safely execute any SQL query, returning True/False upon success/failure''' + try: + self.execute(sql) + return True # ignore num of results, return success + except taos.error.ProgrammingError as err: + return False # failed, for whatever TAOS reason + # Not possile to reach here, non-TAOS exception would have been thrown + def query(self, sql) -> int: # return num rows returned + ''' Return the number of rows affected''' raise RuntimeError("Unexpected execution, should be overriden") def openByType(self): @@ -922,7 +942,9 @@ class AnyState: STATE_VAL_IDX = 0 CAN_CREATE_DB = 1 - CAN_DROP_DB = 2 + # For below, if we can "drop the DB", but strictly speaking + # only "under normal circumstances", as we may override it with the -b option + CAN_DROP_DB = 2 CAN_CREATE_FIXED_SUPER_TABLE = 3 CAN_DROP_FIXED_SUPER_TABLE = 4 CAN_ADD_DATA = 5 @@ -935,6 +957,8 @@ class AnyState: # -1 hack to accomodate the STATE_INVALID case return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] + # Each sub state tells us the "info", about itself, so we can determine + # on things like canDropDB() def getInfo(self): raise RuntimeError("Must be overriden by child classes") @@ -961,6 +985,10 @@ class AnyState: return self._info[self.CAN_CREATE_DB] def canDropDb(self): + # If user requests to run up to a number of DBs, + # we'd then not do drop_db operations any more + if gConfig.max_dbs > 0 : + return False return self._info[self.CAN_DROP_DB] def canCreateFixedSuperTable(self): @@ -1145,13 +1173,16 @@ class StateHasData(AnyState): class StateMechine: - def __init__(self, dbConn): - self._dbConn = dbConn - self._curState = self._findCurrentState() # starting state - # transitition target probabilities, indexed with value of STATE_EMPTY, - # STATE_DB_ONLY, etc. + def __init__(self, db: Database): + self._db = db + # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. self._stateWeights = [1, 2, 10, 40] + def init(self, dbc: DbConn): # late initailization, don't save the dbConn + self._curState = self._findCurrentState(dbc) # starting state + logger.debug("Found Starting State: {}".format(self._curState)) + + # TODO: seems no lnoger used, remove? def getCurrentState(self): return self._curState @@ -1193,34 +1224,35 @@ class StateMechine: typesToStrings(taskTypes))) return taskTypes - def _findCurrentState(self): - dbc = self._dbConn + def _findCurrentState(self, dbc: DbConn): ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state - if not dbc.hasDatabases(): # no database?! + dbName =self._db.getName() + if not dbc.existsDatabase(dbName): # dbc.hasDatabases(): # no database?! logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time())) return StateEmpty() # did not do this when openning connection, and this is NOT the worker # thread, which does this on their own - dbc.use("db") + dbc.use(dbName) if not dbc.hasTables(): # no tables logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) return StateDbOnly() - sTable = DbManager.getFixedSuperTable() - if sTable.hasRegTables(dbc): # no regular tables + sTable = self._db.getFixedSuperTable() + if sTable.hasRegTables(dbc, dbName): # no regular tables logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) return StateSuperTableOnly() else: # has actual tables logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) return StateHasData() - def transition(self, tasks): + # We transition the system to a new state by examining the current state itself + def transition(self, tasks, dbc: DbConn): if (len(tasks) == 0): # before 1st step, or otherwise empty logger.debug("[STT] Starting State: {}".format(self._curState)) return # do nothing # this should show up in the server log, separating steps - self._dbConn.execute("show dnodes") + dbc.execute("show dnodes") # Generic Checks, first based on the start state if self._curState.canCreateDb(): @@ -1251,7 +1283,7 @@ class StateMechine: # if self._state.canReadData(): # Nothing for sure - newState = self._findCurrentState() + newState = self._findCurrentState(dbc) logger.debug("[STT] New DB state determined: {}".format(newState)) # can old state move to new state through the tasks? self._curState.verifyTasksToState(tasks, newState) @@ -1283,49 +1315,51 @@ class StateMechine: if rnd < 0: return i -# Manager of the Database Data/Connection +class Database: + ''' We use this to represent an actual TDengine database inside a service instance, + possibly in a cluster environment. + + For now we use it to manage state transitions in that database + ''' + def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc + self._dbNum = dbNum # we assign a number to databases, for our testing purpose + self._stateMachine = StateMechine(self) + self._stateMachine.init(dbc) -class DbManager(): - def __init__(self, resetDb=True): - self.tableNumQueue = LinearQueue() - # datetime.datetime(2019, 1, 1) # initial date time tick self._lastTick = self.setupLastTick() self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() - # self.openDbServerConnection() - self._dbConn = DbConn.createNative() if ( - gConfig.connector_type == 'native') else DbConn.createRest() - try: - self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected - except taos.error.ProgrammingError as err: - # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) - if (err.msg == 'client disconnected'): # cannot open DB connection - print( - "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") - sys.exit(2) - else: - print("Failed to connect to DB, errno = {}, msg: {}".format(Helper.convertErrno(err.errno), err.msg)) - raise - except BaseException: - print("[=] Unexpected exception") - raise + def getStateMachine(self) -> StateMechine: + return self._stateMachine - if resetDb: - self._dbConn.resetDb() # drop and recreate DB + def getDbNum(self): + return self._dbNum - # Do this after dbConn is in proper shape - self._stateMachine = StateMechine(self._dbConn) + def getName(self): + return "db_{}".format(self._dbNum) - def getDbConn(self): - return self._dbConn + def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us + outTasks = [] + for task in inTasks: + if task.getDb().isSame(self): + outTasks.append(task) + return outTasks - def getStateMachine(self) -> StateMechine: - return self._stateMachine + def isSame(self, other): + return self._dbNum == other._dbNum + + def exists(self, dbc: DbConn): + return dbc.existsDatabase(self.getName()) - # def getState(self): - # return self._stateMachine.getCurrentState() + @classmethod + def getFixedSuperTableName(cls): + return "fs_table" + + @classmethod + def getFixedSuperTable(cls) -> TdSuperTable: + return TdSuperTable(cls.getFixedSuperTableName()) # We aim to create a starting time tick, such that, whenever we run our test here once # We should be able to safely create 100,000 records, which will not have any repeated time stamp @@ -1347,25 +1381,6 @@ class DbManager(): logger.info("Setting up TICKS to start from: {}".format(t4)) return t4 - def pickAndAllocateTable(self): # pick any table, and "use" it - return self.tableNumQueue.pickAndAllocate() - - def addTable(self): - with self._lock: - tIndex = self.tableNumQueue.push() - return tIndex - - @classmethod - def getFixedSuperTableName(cls): - return "fs_table" - - @classmethod - def getFixedSuperTable(cls): - return TdSuperTable(cls.getFixedSuperTableName()) - - def releaseTable(self, i): # return the table back, so others can use it - self.tableNumQueue.release(i) - def getNextTick(self): with self._lock: # prevent duplicate tick if Dice.throw(20) == 0: # 1 in 20 chance @@ -1389,6 +1404,55 @@ class DbManager(): # print("Float obtained: {}".format(ret)) return ret + +class DbManager(): + ''' This is a wrapper around DbConn(), to make it easier to use. + + TODO: rename this to DbConnManager + ''' + def __init__(self): + self.tableNumQueue = LinearQueue() # TODO: delete? + # self.openDbServerConnection() + self._dbConn = DbConn.createNative() if ( + gConfig.connector_type == 'native') else DbConn.createRest() + try: + self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected + except taos.error.ProgrammingError as err: + # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) + if (err.msg == 'client disconnected'): # cannot open DB connection + print( + "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") + sys.exit(2) + else: + print("Failed to connect to DB, errno = {}, msg: {}" + .format(Helper.convertErrno(err.errno), err.msg)) + raise + except BaseException: + print("[=] Unexpected exception") + raise + + # Do this after dbConn is in proper shape + # Moved to Database() + # self._stateMachine = StateMechine(self._dbConn) + + def getDbConn(self): + return self._dbConn + + # TODO: not used any more, to delete + def pickAndAllocateTable(self): # pick any table, and "use" it + return self.tableNumQueue.pickAndAllocate() + + # TODO: Not used any more, to delete + def addTable(self): + with self._lock: + tIndex = self.tableNumQueue.push() + return tIndex + + # Not used any more, to delete + def releaseTable(self, i): # return the table back, so others can use it + self.tableNumQueue.release(i) + + # TODO: not used any more, delete def getTableNameToDelete(self): tblNum = self.tableNumQueue.pop() # TODO: race condition! if (not tblNum): # maybe false @@ -1399,7 +1463,6 @@ class DbManager(): def cleanUp(self): self._dbConn.close() - class TaskExecutor(): class BoundedList: def __init__(self, size=10): @@ -1465,6 +1528,10 @@ class TaskExecutor(): class Task(): + ''' A generic "Task" to be executed. For now we decide that there is no + need to embed a DB connection here, we use whatever the Worker Thread has + instead. But a task is always associated with a DB + ''' taskSn = 100 @classmethod @@ -1473,10 +1540,9 @@ class Task(): # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) return Task.taskSn - def __init__(self, dbManager: DbManager, execStats: ExecutionStats): - self._dbManager = dbManager + def __init__(self, execStats: ExecutionStats, db: Database): self._workerThread = None - self._err = None + self._err = None # type: Exception self._aborted = False self._curStep = None self._numRows = None # Number of rows affected @@ -1486,6 +1552,7 @@ class Task(): # logger.debug("Creating new task {}...".format(self._taskNum)) self._execStats = execStats + self._db = db # A task is always associated/for a specific DB def isSuccess(self): return self._err is None @@ -1494,9 +1561,12 @@ class Task(): return self._aborted def clone(self): # TODO: why do we need this again? - newTask = self.__class__(self._dbManager, self._execStats) + newTask = self.__class__(self._execStats, self._db) return newTask + def getDb(self): + return self._db + def logDebug(self, msg): self._workerThread.logDebug( "Step[{}.{}] {}".format( @@ -1555,9 +1625,12 @@ class Task(): self.logDebug( "[-] executing task {}...".format(self.__class__.__name__)) - self._err = None + self._err = None # TODO: type hint mess up? self._execStats.beginTaskType(self.__class__.__name__) # mark beginning errno2 = None + + # Now pick a database, and stick with it for the duration of the task execution + dbName = self._db.getName() try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: @@ -1595,7 +1668,7 @@ class Task(): self._err = e self._aborted = True traceback.print_exc() - except BaseException: + except BaseException: # TODO: what is this again??!! self.logDebug( "[=] Unexpected exception, SQL: {}".format( wt.getDbConn().getLastSql())) @@ -1607,10 +1680,9 @@ class Task(): # TODO: merge with above. self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2) - def execSql(self, sql): - return self._dbManager.execute(sql) - + # TODO: refactor away, just provide the dbConn def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread + """ Haha """ return wt.execSql(sql) def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread @@ -1762,9 +1834,10 @@ class TaskCreateDb(StateTransitionTask): def canBeginFrom(cls, state: AnyState): return state.canCreateDb() + # Actually creating the database(es) def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - # self.execWtSql(wt, "create database db replica {}".format(Dice.throw(3)+1)) - self.execWtSql(wt, "create database db") + # was: self.execWtSql(wt, "create database db") + self.execWtSql(wt, "create database {}".format(self._db.getName())) class TaskDropDb(StateTransitionTask): @classmethod @@ -1776,10 +1849,9 @@ class TaskDropDb(StateTransitionTask): return state.canDropDb() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - self.execWtSql(wt, "drop database db") + self.execWtSql(wt, "drop database {}".format(self._db.getName())) logger.debug("[OPS] database dropped at {}".format(time.time())) - class TaskCreateSuperTable(StateTransitionTask): @classmethod def getEndState(cls): @@ -1790,13 +1862,14 @@ class TaskCreateSuperTable(StateTransitionTask): return state.canCreateFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - if not wt.dbInUse(): # no DB yet, to the best of our knowledge + if not self._db.exists(wt.getDbConn()): logger.debug("Skipping task, no DB yet") return - sTable = self._dbManager.getFixedSuperTable() + sTable = self._db.getFixedSuperTable() # type: TdSuperTable # wt.execSql("use db") # should always be in place - sTable.create(wt.getDbConn(), {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}) + sTable.create(wt.getDbConn(), self._db.getName(), + {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}) # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that # automatically @@ -1809,17 +1882,20 @@ class TdSuperTable: def getName(self): return self._stName - def create(self, dbc, cols: dict, tags: dict): - sql = "CREATE TABLE db.{} ({}) TAGS ({})".format( + # TODO: odd semantic, create() method is usually static? + def create(self, dbc, dbName, cols: dict, tags: dict): + '''Creating a super table''' + sql = "CREATE TABLE {}.{} ({}) TAGS ({})".format( + dbName, self._stName, ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]), ",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) ) dbc.execute(sql) - def getRegTables(self, dbc: DbConn): + def getRegTables(self, dbc: DbConn, dbName: str): try: - dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later + dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later except taos.error.ProgrammingError as err: errno2 = Helper.convertErrno(err.errno) logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) @@ -1828,20 +1904,20 @@ class TdSuperTable: qr = dbc.getQueryResult() return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation - def hasRegTables(self, dbc: DbConn): - return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0 + def hasRegTables(self, dbc: DbConn, dbName: str): + return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0 - def ensureTable(self, dbc: DbConn, regTableName: str): - sql = "select tbname from db.{} where tbname in ('{}')".format(self._stName, regTableName) + def ensureTable(self, dbc: DbConn, dbName: str, regTableName: str): + sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) if dbc.query(sql) >= 1 : # reg table exists already return - sql = "CREATE TABLE {} USING {} tags ({})".format( - regTableName, self._stName, self._getTagStrForSql(dbc) + sql = "CREATE TABLE {}.{} USING {}.{} tags ({})".format( + dbName, regTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName) ) dbc.execute(sql) - def _getTagStrForSql(self, dbc) : - tags = self._getTags(dbc) + def _getTagStrForSql(self, dbc, dbName: str) : + tags = self._getTags(dbc, dbName) tagStrs = [] for tagName in tags: tagType = tags[tagName] @@ -1855,34 +1931,34 @@ class TdSuperTable: raise RuntimeError("Unexpected tag type: {}".format(tagType)) return ", ".join(tagStrs) - def _getTags(self, dbc) -> dict: - dbc.query("DESCRIBE {}".format(self._stName)) + def _getTags(self, dbc, dbName) -> dict: + dbc.query("DESCRIBE {}.{}".format(dbName, self._stName)) stCols = dbc.getQueryResult() # print(stCols) ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type # print("Tags retrieved: {}".format(ret)) return ret - def addTag(self, dbc, tagName, tagType): - if tagName in self._getTags(dbc): # already + def addTag(self, dbc, dbName, tagName, tagType): + if tagName in self._getTags(dbc, dbName): # already return # sTable.addTag("extraTag", "int") - sql = "alter table db.{} add tag {} {}".format(self._stName, tagName, tagType) + sql = "alter table {}.{} add tag {} {}".format(dbName, self._stName, tagName, tagType) dbc.execute(sql) - def dropTag(self, dbc, tagName): - if not tagName in self._getTags(dbc): # don't have this tag + def dropTag(self, dbc, dbName, tagName): + if not tagName in self._getTags(dbc, dbName): # don't have this tag return - sql = "alter table db.{} drop tag {}".format(self._stName, tagName) + sql = "alter table {}.{} drop tag {}".format(dbName, self._stName, tagName) dbc.execute(sql) - def changeTag(self, dbc, oldTag, newTag): - tags = self._getTags(dbc) + def changeTag(self, dbc, dbName, oldTag, newTag): + tags = self._getTags(dbc, dbName) if not oldTag in tags: # don't have this tag return if newTag in tags: # already have this tag return - sql = "alter table db.{} change tag {} {}".format(self._stName, oldTag, newTag) + sql = "alter table {}.{} change tag {} {}".format(dbName, self._stName, oldTag, newTag) dbc.execute(sql) class TaskReadData(StateTransitionTask): @@ -1895,19 +1971,21 @@ class TaskReadData(StateTransitionTask): return state.canReadData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - sTable = self._dbManager.getFixedSuperTable() + sTable = self._db.getFixedSuperTable() - if random.randrange( - 5) == 0: # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations + # 1 in 5 chance, simulate a broken connection. + if random.randrange(5) == 0: # TODO: break connection in all situations wt.getDbConn().close() wt.getDbConn().open() + print("_r", end="", flush=True) dbc = wt.getDbConn() - for rTbName in sTable.getRegTables(dbc): # regular tables + dbName = self._db.getName() + for rTbName in sTable.getRegTables(dbc, dbName): # regular tables aggExpr = Dice.choice([ - '*', - 'count(*)', - 'avg(speed)', + '*', + 'count(*)', + 'avg(speed)', # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable 'sum(speed)', 'stddev(speed)', @@ -1929,10 +2007,10 @@ class TaskReadData(StateTransitionTask): ]) try: # Run the query against the regular table first - dbc.execute("select {} from db.{}".format(aggExpr, rTbName)) + dbc.execute("select {} from {}.{}".format(aggExpr, dbName, rTbName)) # Then run it against the super table if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! - dbc.execute("select {} from db.{}".format(aggExpr, sTable.getName())) + dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName())) except taos.error.ProgrammingError as err: errno2 = Helper.convertErrno(err.errno) logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) @@ -1948,27 +2026,25 @@ class TaskDropSuperTable(StateTransitionTask): return state.canDropFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - # 1/2 chance, we'll drop the regular tables one by one, in a randomized - # sequence + # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence if Dice.throw(2) == 0: + # print("_7_", end="", flush=True) tblSeq = list(range( 2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) random.shuffle(tblSeq) tickOutput = False # if we have spitted out a "d" character for "drop regular table" isSuccess = True for i in tblSeq: - regTableName = self.getRegTableName( - i) # "db.reg_table_{}".format(i) + regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) try: - self.execWtSql(wt, "drop table {}".format( - regTableName)) # nRows always 0, like MySQL + self.execWtSql(wt, "drop table {}.{}". + format(self._db.getName(), regTableName)) # nRows always 0, like MySQL except taos.error.ProgrammingError as err: # correcting for strange error number scheme errno2 = Helper.convertErrno(err.errno) if (errno2 in [0x362]): # mnode invalid table name isSuccess = False - logger.debug( - "[DB] Acceptable error when dropping a table") + logger.debug("[DB] Acceptable error when dropping a table") continue # try to delete next regular table if (not tickOutput): @@ -1979,8 +2055,8 @@ class TaskDropSuperTable(StateTransitionTask): print("f", end="", flush=True) # Drop the super table itself - tblName = self._dbManager.getFixedSuperTableName() - self.execWtSql(wt, "drop table db.{}".format(tblName)) + tblName = self._db.getFixedSuperTableName() + self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName)) class TaskAlterTags(StateTransitionTask): @@ -1995,19 +2071,20 @@ class TaskAlterTags(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): # tblName = self._dbManager.getFixedSuperTableName() dbc = wt.getDbConn() - sTable = self._dbManager.getFixedSuperTable() + sTable = self._db.getFixedSuperTable() + dbName = self._db.getName() dice = Dice.throw(4) if dice == 0: - sTable.addTag(dbc, "extraTag", "int") + sTable.addTag(dbc, dbName, "extraTag", "int") # sql = "alter table db.{} add tag extraTag int".format(tblName) elif dice == 1: - sTable.dropTag(dbc, "extraTag") + sTable.dropTag(dbc, dbName, "extraTag") # sql = "alter table db.{} drop tag extraTag".format(tblName) elif dice == 2: - sTable.dropTag(dbc, "newTag") + sTable.dropTag(dbc, dbName, "newTag") # sql = "alter table db.{} drop tag newTag".format(tblName) else: # dice == 3 - sTable.changeTag(dbc, "extraTag", "newTag") + sTable.changeTag(dbc, dbName, "extraTag", "newTag") # sql = "alter table db.{} change tag extraTag newTag".format(tblName) class TaskRestartService(StateTransitionTask): @@ -2072,7 +2149,8 @@ class TaskAddData(StateTransitionTask): return state.canAddData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access + # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access + db = self._db tblSeq = list(range( self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) @@ -2082,22 +2160,23 @@ class TaskAddData(StateTransitionTask): else: self.activeTable.add(i) # marking it active - sTable = ds.getFixedSuperTable() + sTable = db.getFixedSuperTable() regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) - sTable.ensureTable(wt.getDbConn(), regTableName) # Ensure the table exists + sTable.ensureTable(wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table - nextInt = ds.getNextInt() + nextInt = db.getNextInt() if gConfig.record_ops: self.prepToRecordOps() self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.flush() os.fsync(self.fAddLogReady) - sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {}) + sql = "insert into {}.{} values ('{}', {});".format( # removed: tags ('{}', {}) + db.getName(), regTableName, # ds.getFixedSuperTableName(), # ds.getNextBinary(), ds.getNextFloat(), - ds.getNextTick(), nextInt) + db.getNextTick(), nextInt) self.execWtSql(wt, sql) # Successfully wrote the data into the DB, let's record it # somehow @@ -2688,40 +2767,38 @@ class ClientManager: self.inSigHandler = False - def _printLastNumbers(self): # to verify data durability - dbManager = DbManager(resetDb=False) - dbc = dbManager.getDbConn() - if dbc.query("show databases") <= 1: # no database (we have a default called "log") - return - dbc.execute("use db") - if dbc.query("show tables") == 0: # no tables - return - - sTbName = dbManager.getFixedSuperTableName() - - # get all regular tables - # TODO: analyze result set later - dbc.query("select TBNAME from db.{}".format(sTbName)) - rTables = dbc.getQueryResult() - - bList = TaskExecutor.BoundedList() - for rTbName in rTables: # regular tables - dbc.query("select speed from db.{}".format(rTbName[0])) - numbers = dbc.getQueryResult() - for row in numbers: - # print("<{}>".format(n), end="", flush=True) - bList.add(row[0]) - - print("Top numbers in DB right now: {}".format(bList)) - print("TDengine client execution is about to start in 2 seconds...") - time.sleep(2.0) - dbManager = None # release? - - def prepare(self): - self._printLastNumbers() + # TODO: need to revise how we verify data durability + # def _printLastNumbers(self): # to verify data durability + # dbManager = DbManager() + # dbc = dbManager.getDbConn() + # if dbc.query("show databases") <= 1: # no database (we have a default called "log") + # return + # dbc.execute("use db") + # if dbc.query("show tables") == 0: # no tables + # return + + # sTbName = dbManager.getFixedSuperTableName() + + # # get all regular tables + # # TODO: analyze result set later + # dbc.query("select TBNAME from db.{}".format(sTbName)) + # rTables = dbc.getQueryResult() + + # bList = TaskExecutor.BoundedList() + # for rTbName in rTables: # regular tables + # dbc.query("select speed from db.{}".format(rTbName[0])) + # numbers = dbc.getQueryResult() + # for row in numbers: + # # print("<{}>".format(n), end="", flush=True) + # bList.add(row[0]) + + # print("Top numbers in DB right now: {}".format(bList)) + # print("TDengine client execution is about to start in 2 seconds...") + # time.sleep(2.0) + # dbManager = None # release? def run(self, svcMgr): - self._printLastNumbers() + # self._printLastNumbers() dbManager = DbManager() # Regular function thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) @@ -2876,6 +2953,13 @@ def main(): '--auto-start-service', action='store_true', help='Automatically start/stop the TDengine service (default: false)') + parser.add_argument( + '-b', + '--max-dbs', + action='store', + default=0, + type=int, + help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)') parser.add_argument( '-c', '--connector-type', -- GitLab