diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 9e38e04b6392edf74a2d9c9648c5a8b7637bbe2a..d9c6716c4b50aff5d499af268f1a9e8affd68e84 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -594,21 +594,29 @@ class DbConn: def _queryAny(self, sql): # actual query result as an int if (not self.isOpen): - raise RuntimeError( - "Cannot query database until connection is open") + raise RuntimeError("Cannot query database until connection is open") nRows = self.query(sql) if nRows != 1: - raise RuntimeError( - "Unexpected result for query: {}, rows = {}".format( - sql, nRows)) + raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) if self.getResultRows() != 1 or self.getResultCols() != 1: - raise RuntimeError( - "Unexpected result set for query: {}".format(sql)) + raise RuntimeError("Unexpected result set for query: {}".format(sql)) return self.getQueryResult()[0][0] + def use(self, dbName): + self.execute("use {}".format(dbName)) + + def hasDatabases(self): + return self.query("show databases") > 0 + + def hasTables(self): + return self.query("show tables") > 0 + def execute(self, sql): raise RuntimeError("Unexpected execution, should be overriden") + def query(self, sql) -> int: # return num rows returned + raise RuntimeError("Unexpected execution, should be overriden") + def openByType(self): raise RuntimeError("Unexpected execution, should be overriden") @@ -742,11 +750,16 @@ class MyTDSql: class DbConnNative(DbConn): + # Class variables + _lock = threading.Lock() + _connInfoDisplayed = False + def __init__(self): super().__init__() self._type = self.TYPE_NATIVE self._conn = None self._cursor = None + def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -763,23 +776,23 @@ class DbConnNative(DbConn): break return buildPath - connInfoDisplayed = False + def openByType(self): # Open connection cfgPath = self.getBuildPath() + "/test/cfg" hostAddr = "127.0.0.1" - if not self.connInfoDisplayed: - logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) - self.connInfoDisplayed = True - self._conn = taos.connect( - host=hostAddr, - config=cfgPath) # TODO: make configurable - self._cursor = self._conn.cursor() - - # Get the connection/cursor ready + with self._lock: # force single threading for opening DB connections + if not self._connInfoDisplayed: + self.__class__._connInfoDisplayed = True # updating CLASS variable + logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) + + self._conn = taos.connect( + host=hostAddr, + config=cfgPath) # TODO: make configurable + self._cursor = self._conn.cursor() + self._cursor.execute('reset query cache') # self._cursor.execute('use db') # do this at the beginning of every - # step # Open connection self._tdSql = MyTDSql() @@ -1128,33 +1141,22 @@ class StateMechine: def _findCurrentState(self): dbc = self._dbConn ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state - if dbc.query("show databases") == 0: # no database?! - # logger.debug("Found EMPTY state") - logger.debug( - "[STT] empty database found, between {} and {}".format( - ts, time.time())) + if not dbc.hasDatabases(): # no database?! + logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time())) return StateEmpty() # did not do this when openning connection, and this is NOT the worker # thread, which does this on their own - dbc.execute("use db") - if dbc.query("show tables") == 0: # no tables - # logger.debug("Found DB ONLY state") - logger.debug( - "[STT] DB_ONLY found, between {} and {}".format( - ts, time.time())) + dbc.use("db") + if not dbc.hasTables(): # no tables + logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) return StateDbOnly() - if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) - ) == 0: # no regular tables - # logger.debug("Found TABLE_ONLY state") - logger.debug( - "[STT] SUPER_TABLE_ONLY found, between {} and {}".format( - ts, time.time())) + + sTable = DbManager.getFixedSuperTable() + if sTable.hasRegTables(dbc): # no regular tables + logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) return StateSuperTableOnly() else: # has actual tables - # logger.debug("Found HAS_DATA state") - logger.debug( - "[STT] HAS_DATA found, between {} and {}".format( - ts, time.time())) + logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) return StateHasData() def transition(self, tasks): @@ -1300,6 +1302,10 @@ class DbManager(): def getFixedSuperTableName(cls): return "fs_table" + @classmethod + def getFixedSuperTable(cls): + return TdSuperTable(cls.getFixedSuperTableName()) + def releaseTable(self, i): # return the table back, so others can use it self.tableNumQueue.release(i) @@ -1322,7 +1328,9 @@ class DbManager(): self.getNextInt()) def getNextFloat(self): - return 0.9 + self.getNextInt() + ret = 0.9 + self.getNextInt() + # print("Float obtained: {}".format(ret)) + return ret def getTableNameToDelete(self): tblNum = self.tableNumQueue.pop() # TODO: race condition! @@ -1446,6 +1454,30 @@ class Task(): "To be implemeted by child classes, class name: {}".format( self.__class__.__name__)) + def _isErrAcceptable(self, errno, msg): + if errno in [ + 0x05, # TSDB_CODE_RPC_NOT_READY + # 0x200, # invalid SQL, TODO: re-examine with TD-934 + 0x360, 0x362, + 0x369, # tag already exists + 0x36A, 0x36B, 0x36D, + 0x381, + 0x380, # "db not selected" + 0x383, + 0x386, # DB is being dropped?! + 0x503, + 0x510, # vnode not in ready state + 0x600, + 1000 # REST catch-all error + ]: + return True # These are the ALWAYS-ACCEPTABLE ones + elif errno == 0x200 : # invalid SQL, we need to div in a bit more + if msg.find("invalid column name") != -1: + return True + + return False # Not an acceptable error + + def execute(self, wt: WorkerThread): wt.verifyThreadSelf() self._workerThread = wt # type: ignore @@ -1464,26 +1496,15 @@ class Task(): errno2 = err.errno if ( err.errno > 0) else 0x80000000 + err.errno # correct error scheme if (gConfig.continue_on_exception): # user choose to continue - self.logDebug( - "[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( + self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, self._lastSql)) self._err = err - elif (errno2 in [ - 0x05, # TSDB_CODE_RPC_NOT_READY - 0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, - 0x381, 0x380, 0x383, - 0x386, # DB is being dropped?! - 0x503, - 0x510, # vnode not in ready state - 0x600, - 1000 # REST catch-all error - ]): # allowed errors - self.logDebug( - "[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( + elif self._isErrAcceptable(errno2, err.__str__()): + self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, self._lastSql)) print("_", end="", flush=True) self._err = err - else: + else: # not an acceptable error errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, self._lastSql) self.logDebug(errMsg) @@ -1705,6 +1726,34 @@ class TaskCreateSuperTable(StateTransitionTask): # automatically +class TdSuperTable: + def __init__(self, stName): + self._stName = stName + + def getRegTables(self, dbc: DbConn): + try: + dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later + except taos.error.ProgrammingError as err: + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno + logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) + raise + + qr = dbc.getQueryResult() + return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation + + def hasRegTables(self, dbc: DbConn): + return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0 + + def ensureTable(self, dbc: DbConn, regTableName: str): + sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName) + if dbc.query(sql) >= 1 : # reg table exists already + return + sql = "CREATE TABLE {} USING {} tags ('{}', {})".format( + regTableName, self._stName, + 'xyz', '33' + ) + dbc.execute(sql) + class TaskReadData(StateTransitionTask): @classmethod def getEndState(cls): @@ -1715,23 +1764,24 @@ class TaskReadData(StateTransitionTask): return state.canReadData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - sTbName = self._dbManager.getFixedSuperTableName() - self.queryWtSql(wt, "select TBNAME from db.{}".format( - sTbName)) # TODO: analyze result set later + sTable = self._dbManager.getFixedSuperTable() if random.randrange( 5) == 0: # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations wt.getDbConn().close() wt.getDbConn().open() - else: - # wt.getDbConn().getQueryResult() - rTables = self.getQueryResult(wt) - # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - for rTbName in rTables: # regular tables - self.execWtSql(wt, "select * from db.{}".format(rTbName[0])) - - # tdSql.query(" cars where tbname in ('carzero', 'carone')") - + + for rTbName in sTable.getRegTables(self._dbManager.getDbConn()): # regular tables + aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)', + # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable + 'sum(speed)', 'stddev(speed)', + 'min(speed)', 'max(speed)', 'first(speed)', 'last(speed)']) # TODO: add more from 'top' + try: + self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName)) + except taos.error.ProgrammingError as err: + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno + logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) + raise class TaskDropSuperTable(StateTransitionTask): @classmethod @@ -1834,38 +1884,30 @@ class TaskAddData(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbManager - # wt.execSql("use db") # TODO: seems to be an INSERT bug to require - # this - tblSeq = list( - range( + tblSeq = list(range( self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) for i in tblSeq: if (i in self.activeTable): # wow already active - # logger.info("Concurrent data insertion into table: {}".format(i)) - # print("ct({})".format(i), end="", flush=True) # Concurrent - # insertion into table - print("x", end="", flush=True) + print("x", end="", flush=True) # concurrent insertion else: self.activeTable.add(i) # marking it active - # No need to shuffle data sequence, unless later we decide to do - # non-increment insertion - regTableName = self.getRegTableName( - i) # "db.reg_table_{}".format(i) - for j in range( - self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table + + sTable = ds.getFixedSuperTable() + regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) + sTable.ensureTable(ds.getDbConn(), regTableName) # Ensure the table exists + + for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table nextInt = ds.getNextInt() if gConfig.record_ops: self.prepToRecordOps() - self.fAddLogReady.write( - "Ready to write {} to {}\n".format( - nextInt, regTableName)) + self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.flush() os.fsync(self.fAddLogReady) - sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format( + sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {}) regTableName, - ds.getFixedSuperTableName(), - ds.getNextBinary(), ds.getNextFloat(), + # ds.getFixedSuperTableName(), + # ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), nextInt) self.execWtSql(wt, sql) # Successfully wrote the data into the DB, let's record it @@ -1912,6 +1954,10 @@ class Dice(): raise RuntimeError("Cannot throw dice before seeding it") return random.randrange(start, stop) + @classmethod + def choice(cls, cList): + return random.choice(cList) + class LoggingFilter(logging.Filter): def filter(self, record: logging.LogRecord):