diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 21de44ef98c3d0d3a945fff4ed67a7f488ff7102..bd2038ca3abfbd9c34f8327e601bcae705a4d7e0 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -30,6 +30,8 @@ import time import logging import datetime import textwrap +import requests +from requests.auth import HTTPBasicAuth from typing import List from typing import Dict @@ -76,7 +78,8 @@ class WorkerThread: # Let us have a DB connection of our own if ( gConfig.per_thread_db_connection ): # type: ignore - self._dbConn = DbConn() + # print("connector_type = {}".format(gConfig.connector_type)) + self._dbConn = DbConn.createNative() if (gConfig.connector_type == 'native') else DbConn.createRest() self._dbInUse = False # if "use db" was executed already @@ -434,27 +437,39 @@ class LinearQueue(): return ret class DbConn: + TYPE_NATIVE = "native-c" + TYPE_REST = "rest-api" + TYPE_INVALID = "invalid" + + @classmethod + def create(cls, connType): + if connType == cls.TYPE_NATIVE: + return DbConnNative() + elif connType == cls.TYPE_REST: + return DbConnRest() + else: + raise RuntimeError("Unexpected connection type: {}".format(connType)) + + @classmethod + def createNative(cls): + return cls.create(cls.TYPE_NATIVE) + + @classmethod + def createRest(cls): + return cls.create(cls.TYPE_REST) + def __init__(self): - self._conn = None - self._cursor = None self.isOpen = False - - def open(self): # Open connection + self._type = self.TYPE_INVALID + + def open(self): if ( self.isOpen ): raise RuntimeError("Cannot re-open an existing DB connection") - cfgPath = "../../build/test/cfg" - self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable - self._cursor = self._conn.cursor() + # below implemented by child classes + self.openByType() - # Get the connection/cursor ready - self._cursor.execute('reset query cache') - # self._cursor.execute('use db') # do this at the beginning of every step - - # Open connection - self._tdSql = TDSql() - self._tdSql.init(self._cursor) - logger.debug("[DB] data connection opened") + logger.debug("[DB] data connection opened, type = {}".format(self._type)) self.isOpen = True def resetDb(self): # reset the whole database, etc. @@ -462,13 +477,123 @@ class DbConn: raise RuntimeError("Cannot reset database until connection is open") # self._tdSql.prepare() # Recreate database, etc. - self._cursor.execute('drop database if exists db') + self.execute('drop database if exists db') logger.debug("Resetting DB, dropped database") # self._cursor.execute('create database db') # self._cursor.execute('use db') - # tdSql.execute('show databases') + def queryScalar(self, sql) -> int : + return self._queryAny(sql) + + def queryString(self, sql) -> str : + return self._queryAny(sql) + + def _queryAny(self, sql) : # actual query result as an int + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") + nRows = self.query(sql) + if nRows != 1 : + raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) + if self.getResultRows() != 1 or self.getResultCols() != 1: + raise RuntimeError("Unexpected result set for query: {}".format(sql)) + return self.getQueryResult()[0][0] + + def execute(self, sql): + raise RuntimeError("Unexpected execution, should be overriden") + def openByType(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getQueryResult(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getResultRows(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getResultCols(self): + raise RuntimeError("Unexpected execution, should be overriden") + +# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql +class DbConnRest(DbConn): + def __init__(self): + super().__init__() + self._type = self.TYPE_REST + self._url = "http://localhost:6020/rest/sql" # fixed for now + self._result = None + + def openByType(self): # Open connection + pass # do nothing, always open + + def close(self): + if ( not self.isOpen ): + raise RuntimeError("Cannot clean up database until connection is open") + # Do nothing for REST + logger.debug("[DB] REST Database connection closed") + self.isOpen = False + + def _doSql(self, sql): + r = requests.post(self._url, + data = sql, + auth = HTTPBasicAuth('root', 'taosdata')) + rj = r.json() + # Sanity check for the "Json Result" + if (not 'status' in rj): + raise RuntimeError("No status in REST response") + + if rj['status'] == 'error': # clearly reported error + if (not 'code' in rj): # error without code + raise RuntimeError("REST error return without code") + errno = rj['code'] # May need to massage this in the future + # print("Raising programming error with REST return: {}".format(rj)) + raise taos.error.ProgrammingError(rj['desc'], errno) # todo: check existance of 'desc' + + if rj['status'] != 'succ': # better be this + raise RuntimeError("Unexpected REST return status: {}".format(rj['status'])) + + nRows = rj['rows'] if ('rows' in rj) else 0 + self._result = rj + return nRows + + def execute(self, sql): + if ( not self.isOpen ): + raise RuntimeError("Cannot execute database commands until connection is open") + logger.debug("[SQL-REST] Executing SQL: {}".format(sql)) + nRows = self._doSql(sql) + logger.debug("[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + return nRows + + def query(self, sql) : # return rows affected + return self.execute(sql) + + def getQueryResult(self): + return self._result['data'] + + def getResultRows(self): + print(self._result) + raise RuntimeError("TBD") + # return self._tdSql.queryRows + + def getResultCols(self): + print(self._result) + raise RuntimeError("TBD") + +class DbConnNative(DbConn): + def __init__(self): + super().__init__() + self._type = self.TYPE_REST + self._conn = None + self._cursor = None + + def openByType(self): # Open connection + cfgPath = "../../build/test/cfg" + self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + self._cursor = self._conn.cursor() + + # Get the connection/cursor ready + self._cursor.execute('reset query cache') + # self._cursor.execute('use db') # do this at the beginning of every step + + # Open connection + self._tdSql = TDSql() + self._tdSql.init(self._cursor) + def close(self): if ( not self.isOpen ): raise RuntimeError("Cannot clean up database until connection is open") @@ -496,22 +621,12 @@ class DbConn: def getQueryResult(self): return self._tdSql.queryResult - def _queryAny(self, sql) : # actual query result as an int - if ( not self.isOpen ): - raise RuntimeError("Cannot query database until connection is open") - tSql = self._tdSql - nRows = tSql.query(sql) - if nRows != 1 : - raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) - if tSql.queryRows != 1 or tSql.queryCols != 1: - raise RuntimeError("Unexpected result set for query: {}".format(sql)) - return tSql.queryResult[0][0] + def getResultRows(self): + return self._tdSql.queryRows - def queryScalar(self, sql) -> int : - return self._queryAny(sql) + def getResultCols(self): + return self._tdSql.queryCols - def queryString(self, sql) -> str : - return self._queryAny(sql) class AnyState: STATE_INVALID = -1 @@ -859,7 +974,7 @@ class DbManager(): self._lock = threading.RLock() # self.openDbServerConnection() - self._dbConn = DbConn() + self._dbConn = DbConn.createNative() if (gConfig.connector_type=='native') else DbConn.createRest() try: self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected except taos.error.ProgrammingError as err: @@ -1013,8 +1128,10 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - errno2 = 0x80000000 + err.errno # positive error number - if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600 ]) : # allowed errors + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme + if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600, + 1000 # REST catch-all error + ]) : # allowed errors self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) print("_", end="", flush=True) self._err = err @@ -1239,8 +1356,8 @@ class TaskDropSuperTable(StateTransitionTask): regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) try: self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL - except taos.error.ProgrammingError as err: - errno2 = 0x80000000 + err.errno # positive error number + except taos.error.ProgrammingError as err: + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correcting for strange error number scheme if ( errno2 in [0x362]) : # mnode invalid table name isSuccess = False logger.debug("[DB] Acceptable error when dropping a table") @@ -1400,11 +1517,6 @@ class LoggingFilter(logging.Filter): if ( record.levelno >= logging.INFO ) : return True # info or above always log - - - # print("type = {}, value={}".format(type(msg), msg)) - # sys.exit() - # Commenting out below to adjust... # if msg.startswith("[TRD]"): @@ -1490,6 +1602,8 @@ def main(): ''')) + parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, + help='Connector type to use: native, rest, or mixed (default: 10)') parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') parser.add_argument('-e', '--run-tdengine', action='store_true',