diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 7d3eb959c06e8c3f57b6b7a738d487f7bf04aab7..7588e03e17cd04afa615adc7274dd363421f8112 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -161,6 +161,21 @@ class WorkerThread: logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + # Before we fetch the task and run it, let's ensure we properly "use" the database + try: + if (gConfig.per_thread_db_connection): # most likely TRUE + if not self._dbConn.isOpen: # might have been closed during server auto-restart + self._dbConn.open() + self.useDb() # might encounter exceptions. TODO: catch + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready + # ignore + dummy = 0 + else: + print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg)) + raise + # Fetch a task from the Thread Coordinator logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() @@ -324,10 +339,12 @@ class ThreadCoordinator: logger.debug("[STT] transition ended") # Due to limitation (or maybe not) of the Python library, # we cannot share connections across threads - if sm.hasDatabase(): - for t in self._pool.threadList: - logger.debug("[DB] use db for all worker threads") - t.useDb() + # Here we are in main thread, we cannot operate the connections created in workers + # Moving below to task loop + # if sm.hasDatabase(): + # for t in self._pool.threadList: + # logger.debug("[DB] use db for all worker threads") + # t.useDb() # t.execSql("use db") # main thread executing "use # db" on behalf of every worker thread except taos.error.ProgrammingError as err: @@ -387,7 +404,7 @@ class ThreadCoordinator: transitionFailed = self._doTransition() # To start, we end step -1 first except taos.error.ProgrammingError as err: transitionFailed = True - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme + errno2 = Helper.convertErrno(err.errno) # correct error scheme errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err) logger.info(errMsg) self._execStats.registerFailure(errMsg) @@ -468,6 +485,10 @@ class ThreadCoordinator: # We define a class to run a number of threads in locking steps. +class Helper: + @classmethod + def convertErrno(cls, errno): + return errno if (errno > 0) else 0x80000000 + errno class ThreadPool: def __init__(self, numThreads, maxSteps): @@ -613,8 +634,7 @@ class DbConn: def resetDb(self): # reset the whole database, etc. if (not self.isOpen): - raise RuntimeError( - "Cannot reset database until connection is open") + raise RuntimeError("Cannot reset database until connection is open") # self._tdSql.prepare() # Recreate database, etc. self.execute('drop database if exists db') @@ -681,8 +701,7 @@ class DbConnRest(DbConn): def close(self): if (not self.isOpen): - raise RuntimeError( - "Cannot clean up database until connection is open") + raise RuntimeError("Cannot clean up database until connection is open") # Do nothing for REST logger.debug("[DB] REST Database connection closed") self.isOpen = False @@ -747,27 +766,32 @@ class DbConnRest(DbConn): class MyTDSql: - def __init__(self): + def __init__(self, hostAddr, cfgPath): + # Make the DB connection + self._conn = taos.connect(host=hostAddr, config=cfgPath) + self._cursor = self._conn.cursor() + self.queryRows = 0 self.queryCols = 0 self.affectedRows = 0 - def init(self, cursor, log=True): - self.cursor = cursor + # def init(self, cursor, log=True): + # self.cursor = cursor # if (log): # caller = inspect.getframeinfo(inspect.stack()[1][0]) # self.cursor.log(caller.filename + ".sql") def close(self): - self.cursor.close() + self._conn.close() # TODO: very important, cursor close does NOT close DB connection! + self._cursor.close() def query(self, sql): self.sql = sql try: - self.cursor.execute(sql) - self.queryResult = self.cursor.fetchall() + self._cursor.execute(sql) + self.queryResult = self._cursor.fetchall() self.queryRows = len(self.queryResult) - self.queryCols = len(self.cursor.description) + self.queryCols = len(self._cursor.description) except Exception as e: # caller = inspect.getframeinfo(inspect.stack()[1][0]) # args = (caller.filename, caller.lineno, sql, repr(e)) @@ -778,7 +802,7 @@ class MyTDSql: def execute(self, sql): self.sql = sql try: - self.affectedRows = self.cursor.execute(sql) + self.affectedRows = self._cursor.execute(sql) except Exception as e: # caller = inspect.getframeinfo(inspect.stack()[1][0]) # args = (caller.filename, caller.lineno, sql, repr(e)) @@ -791,13 +815,13 @@ class DbConnNative(DbConn): # Class variables _lock = threading.Lock() _connInfoDisplayed = False + totalConnections = 0 # Not private def __init__(self): super().__init__() self._type = self.TYPE_NATIVE self._conn = None - self._cursor = None - + # self._cursor = None def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -814,7 +838,8 @@ class DbConnNative(DbConn): buildPath = root[:len(root) - len("/build/bin")] break if buildPath == None: - raise RuntimeError("Failed to determine buildPath, selfPath={}".format(selfPath)) + raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}" + .format(selfPath, projPath)) return buildPath @@ -822,33 +847,40 @@ class DbConnNative(DbConn): cfgPath = self.getBuildPath() + "/test/cfg" hostAddr = "127.0.0.1" - with self._lock: # force single threading for opening DB connections - if not self._connInfoDisplayed: - self.__class__._connInfoDisplayed = True # updating CLASS variable - logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) - - self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable - self._cursor = self._conn.cursor() + cls = self.__class__ # Get the class, to access class variables + with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! + if not cls._connInfoDisplayed: + cls._connInfoDisplayed = True # updating CLASS variable + logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) + # Make the connection + # self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable + # self._cursor = self._conn.cursor() + # Record the count in the class + self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection + cls.totalConnections += 1 - self._cursor.execute('reset query cache') + self._tdSql.execute('reset query cache') # self._cursor.execute('use db') # do this at the beginning of every # Open connection - self._tdSql = MyTDSql() - self._tdSql.init(self._cursor) - + # self._tdSql = MyTDSql() + # self._tdSql.init(self._cursor) + def close(self): if (not self.isOpen): - raise RuntimeError( - "Cannot clean up database until connection is open") + raise RuntimeError("Cannot clean up database until connection is open") self._tdSql.close() + # Decrement the class wide counter + cls = self.__class__ # Get the class, to access class variables + with cls._lock: + cls.totalConnections -= 1 + logger.debug("[DB] Database connection closed") self.isOpen = False def execute(self, sql): if (not self.isOpen): - raise RuntimeError( - "Cannot execute database commands until connection is open") + raise RuntimeError("Cannot execute database commands until connection is open") logger.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql nRows = self._tdSql.execute(sql) @@ -1528,7 +1560,7 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme + errno2 = Helper.convertErrno(err.errno) if (gConfig.continue_on_exception): # user choose to continue self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, wt.getDbConn().getLastSql())) @@ -1678,9 +1710,8 @@ class ExecutionStats: logger.info( "| Total Elapsed Time (from wall clock): {:.3f} seconds".format( self._elapsedTime)) - logger.info( - "| Top numbers written: {}".format( - TaskExecutor.getBoundedList())) + logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList())) + logger.info("| Total Number of Active DB Native Connections: {}".format(DbConnNative.totalConnections)) logger.info( "----------------------------------------------------------------------") @@ -1789,7 +1820,7 @@ class TdSuperTable: try: dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later except taos.error.ProgrammingError as err: - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno + errno2 = Helper.convertErrno(err.errno) logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) raise @@ -1891,7 +1922,7 @@ class TaskReadData(StateTransitionTask): if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! dbc.execute("select {} from db.{}".format(aggExpr, sTable.getName())) except taos.error.ProgrammingError as err: - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno + errno2 = Helper.convertErrno(err.errno) logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) raise @@ -1920,9 +1951,8 @@ class TaskDropSuperTable(StateTransitionTask): self.execWtSql(wt, "drop table {}".format( regTableName)) # nRows always 0, like MySQL except taos.error.ProgrammingError as err: - # correcting for strange error number scheme - errno2 = err.errno if ( - err.errno > 0) else 0x80000000 + err.errno + # correcting for strange error number scheme + errno2 = Helper.convertErrno(err.errno) if (errno2 in [0x362]): # mnode invalid table name isSuccess = False logger.debug( @@ -2429,7 +2459,11 @@ class ServiceManagerThread: for line in iter(out.readline, b''): # print("Finished reading a line: {}".format(line)) # print("Adding item to queue...") - line = line.decode("utf-8").rstrip() + try: + line = line.decode("utf-8").rstrip() + except UnicodeError: + print("\nNon-UTF8 server output: {}\n".format(line)) + # This might block, and then causing "out" buffer to block queue.put(line) self._printProgress("_i") @@ -2455,7 +2489,7 @@ class ServiceManagerThread: def svcErrorReader(self, err: IO, queue): for line in iter(err.readline, b''): - print("\nTD Svc STDERR: {}".format(line)) + print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line)) class TdeSubProcess: