提交 6b45bc66 编写于 作者: S Steven Li

Temp check in for crash_gen, crashing at tsclient.h:435

上级 3ae15544
...@@ -594,21 +594,29 @@ class DbConn: ...@@ -594,21 +594,29 @@ class DbConn:
def _queryAny(self, sql): # actual query result as an int def _queryAny(self, sql): # actual query result as an int
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError( raise RuntimeError("Cannot query database until connection is open")
"Cannot query database until connection is open")
nRows = self.query(sql) nRows = self.query(sql)
if nRows != 1: if nRows != 1:
raise RuntimeError( raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
"Unexpected result for query: {}, rows = {}".format(
sql, nRows))
if self.getResultRows() != 1 or self.getResultCols() != 1: if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError( raise RuntimeError("Unexpected result set for query: {}".format(sql))
"Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0] return self.getQueryResult()[0][0]
def use(self, dbName):
self.execute("use {}".format(dbName))
def hasDatabases(self):
return self.query("show databases") > 0
def hasTables(self):
return self.query("show tables") > 0
def execute(self, sql): def execute(self, sql):
raise RuntimeError("Unexpected execution, should be overriden") raise RuntimeError("Unexpected execution, should be overriden")
def query(self, sql) -> int: # return num rows returned
raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self): def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden") raise RuntimeError("Unexpected execution, should be overriden")
...@@ -742,11 +750,16 @@ class MyTDSql: ...@@ -742,11 +750,16 @@ class MyTDSql:
class DbConnNative(DbConn): class DbConnNative(DbConn):
# Class variables
_lock = threading.Lock()
_connInfoDisplayed = False
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self._type = self.TYPE_NATIVE self._type = self.TYPE_NATIVE
self._conn = None self._conn = None
self._cursor = None self._cursor = None
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
...@@ -763,23 +776,23 @@ class DbConnNative(DbConn): ...@@ -763,23 +776,23 @@ class DbConnNative(DbConn):
break break
return buildPath return buildPath
connInfoDisplayed = False
def openByType(self): # Open connection def openByType(self): # Open connection
cfgPath = self.getBuildPath() + "/test/cfg" cfgPath = self.getBuildPath() + "/test/cfg"
hostAddr = "127.0.0.1" hostAddr = "127.0.0.1"
if not self.connInfoDisplayed:
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
self.connInfoDisplayed = True
self._conn = taos.connect( with self._lock: # force single threading for opening DB connections
host=hostAddr, if not self._connInfoDisplayed:
config=cfgPath) # TODO: make configurable self.__class__._connInfoDisplayed = True # updating CLASS variable
self._cursor = self._conn.cursor() logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
# Get the connection/cursor ready self._conn = taos.connect(
host=hostAddr,
config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
self._cursor.execute('reset query cache') self._cursor.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every # self._cursor.execute('use db') # do this at the beginning of every
# step
# Open connection # Open connection
self._tdSql = MyTDSql() self._tdSql = MyTDSql()
...@@ -1128,33 +1141,22 @@ class StateMechine: ...@@ -1128,33 +1141,22 @@ class StateMechine:
def _findCurrentState(self): def _findCurrentState(self):
dbc = self._dbConn dbc = self._dbConn
ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
if dbc.query("show databases") == 0: # no database?! if not dbc.hasDatabases(): # no database?!
# logger.debug("Found EMPTY state") logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
logger.debug(
"[STT] empty database found, between {} and {}".format(
ts, time.time()))
return StateEmpty() return StateEmpty()
# did not do this when openning connection, and this is NOT the worker # did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own # thread, which does this on their own
dbc.execute("use db") dbc.use("db")
if dbc.query("show tables") == 0: # no tables if not dbc.hasTables(): # no tables
# logger.debug("Found DB ONLY state") logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
logger.debug(
"[STT] DB_ONLY found, between {} and {}".format(
ts, time.time()))
return StateDbOnly() return StateDbOnly()
if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName())
) == 0: # no regular tables sTable = DbManager.getFixedSuperTable()
# logger.debug("Found TABLE_ONLY state") if sTable.hasRegTables(dbc): # no regular tables
logger.debug( logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
"[STT] SUPER_TABLE_ONLY found, between {} and {}".format(
ts, time.time()))
return StateSuperTableOnly() return StateSuperTableOnly()
else: # has actual tables else: # has actual tables
# logger.debug("Found HAS_DATA state") logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
logger.debug(
"[STT] HAS_DATA found, between {} and {}".format(
ts, time.time()))
return StateHasData() return StateHasData()
def transition(self, tasks): def transition(self, tasks):
...@@ -1300,6 +1302,10 @@ class DbManager(): ...@@ -1300,6 +1302,10 @@ class DbManager():
def getFixedSuperTableName(cls): def getFixedSuperTableName(cls):
return "fs_table" return "fs_table"
@classmethod
def getFixedSuperTable(cls):
return TdSuperTable(cls.getFixedSuperTableName())
def releaseTable(self, i): # return the table back, so others can use it def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i) self.tableNumQueue.release(i)
...@@ -1322,7 +1328,9 @@ class DbManager(): ...@@ -1322,7 +1328,9 @@ class DbManager():
self.getNextInt()) self.getNextInt())
def getNextFloat(self): def getNextFloat(self):
return 0.9 + self.getNextInt() ret = 0.9 + self.getNextInt()
# print("Float obtained: {}".format(ret))
return ret
def getTableNameToDelete(self): def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition! tblNum = self.tableNumQueue.pop() # TODO: race condition!
...@@ -1446,6 +1454,30 @@ class Task(): ...@@ -1446,6 +1454,30 @@ class Task():
"To be implemeted by child classes, class name: {}".format( "To be implemeted by child classes, class name: {}".format(
self.__class__.__name__)) self.__class__.__name__))
def _isErrAcceptable(self, errno, msg):
if errno in [
0x05, # TSDB_CODE_RPC_NOT_READY
# 0x200, # invalid SQL, TODO: re-examine with TD-934
0x360, 0x362,
0x369, # tag already exists
0x36A, 0x36B, 0x36D,
0x381,
0x380, # "db not selected"
0x383,
0x386, # DB is being dropped?!
0x503,
0x510, # vnode not in ready state
0x600,
1000 # REST catch-all error
]:
return True # These are the ALWAYS-ACCEPTABLE ones
elif errno == 0x200 : # invalid SQL, we need to div in a bit more
if msg.find("invalid column name") != -1:
return True
return False # Not an acceptable error
def execute(self, wt: WorkerThread): def execute(self, wt: WorkerThread):
wt.verifyThreadSelf() wt.verifyThreadSelf()
self._workerThread = wt # type: ignore self._workerThread = wt # type: ignore
...@@ -1464,26 +1496,15 @@ class Task(): ...@@ -1464,26 +1496,15 @@ class Task():
errno2 = err.errno if ( errno2 = err.errno if (
err.errno > 0) else 0x80000000 + err.errno # correct error scheme err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if (gConfig.continue_on_exception): # user choose to continue if (gConfig.continue_on_exception): # user choose to continue
self.logDebug( self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
"[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql)) errno2, err, self._lastSql))
self._err = err self._err = err
elif (errno2 in [ elif self._isErrAcceptable(errno2, err.__str__()):
0x05, # TSDB_CODE_RPC_NOT_READY self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D,
0x381, 0x380, 0x383,
0x386, # DB is being dropped?!
0x503,
0x510, # vnode not in ready state
0x600,
1000 # REST catch-all error
]): # allowed errors
self.logDebug(
"[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql)) errno2, err, self._lastSql))
print("_", end="", flush=True) print("_", end="", flush=True)
self._err = err self._err = err
else: else: # not an acceptable error
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql) errno2, err, self._lastSql)
self.logDebug(errMsg) self.logDebug(errMsg)
...@@ -1705,6 +1726,34 @@ class TaskCreateSuperTable(StateTransitionTask): ...@@ -1705,6 +1726,34 @@ class TaskCreateSuperTable(StateTransitionTask):
# automatically # automatically
class TdSuperTable:
def __init__(self, stName):
self._stName = stName
def getRegTables(self, dbc: DbConn):
try:
dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
raise
qr = dbc.getQueryResult()
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def hasRegTables(self, dbc: DbConn):
return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0
def ensureTable(self, dbc: DbConn, regTableName: str):
sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already
return
sql = "CREATE TABLE {} USING {} tags ('{}', {})".format(
regTableName, self._stName,
'xyz', '33'
)
dbc.execute(sql)
class TaskReadData(StateTransitionTask): class TaskReadData(StateTransitionTask):
@classmethod @classmethod
def getEndState(cls): def getEndState(cls):
...@@ -1715,23 +1764,24 @@ class TaskReadData(StateTransitionTask): ...@@ -1715,23 +1764,24 @@ class TaskReadData(StateTransitionTask):
return state.canReadData() return state.canReadData()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTbName = self._dbManager.getFixedSuperTableName() sTable = self._dbManager.getFixedSuperTable()
self.queryWtSql(wt, "select TBNAME from db.{}".format(
sTbName)) # TODO: analyze result set later
if random.randrange( if random.randrange(
5) == 0: # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations 5) == 0: # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
wt.getDbConn().close() wt.getDbConn().close()
wt.getDbConn().open() wt.getDbConn().open()
else:
# wt.getDbConn().getQueryResult() for rTbName in sTable.getRegTables(self._dbManager.getDbConn()): # regular tables
rTables = self.getQueryResult(wt) aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)',
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
for rTbName in rTables: # regular tables 'sum(speed)', 'stddev(speed)',
self.execWtSql(wt, "select * from db.{}".format(rTbName[0])) 'min(speed)', 'max(speed)', 'first(speed)', 'last(speed)']) # TODO: add more from 'top'
try:
# tdSql.query(" cars where tbname in ('carzero', 'carone')") self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName))
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
raise
class TaskDropSuperTable(StateTransitionTask): class TaskDropSuperTable(StateTransitionTask):
@classmethod @classmethod
...@@ -1834,38 +1884,30 @@ class TaskAddData(StateTransitionTask): ...@@ -1834,38 +1884,30 @@ class TaskAddData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbManager ds = self._dbManager
# wt.execSql("use db") # TODO: seems to be an INSERT bug to require tblSeq = list(range(
# this
tblSeq = list(
range(
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
random.shuffle(tblSeq) random.shuffle(tblSeq)
for i in tblSeq: for i in tblSeq:
if (i in self.activeTable): # wow already active if (i in self.activeTable): # wow already active
# logger.info("Concurrent data insertion into table: {}".format(i)) print("x", end="", flush=True) # concurrent insertion
# print("ct({})".format(i), end="", flush=True) # Concurrent
# insertion into table
print("x", end="", flush=True)
else: else:
self.activeTable.add(i) # marking it active self.activeTable.add(i) # marking it active
# No need to shuffle data sequence, unless later we decide to do
# non-increment insertion sTable = ds.getFixedSuperTable()
regTableName = self.getRegTableName( regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
i) # "db.reg_table_{}".format(i) sTable.ensureTable(ds.getDbConn(), regTableName) # Ensure the table exists
for j in range(
self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
nextInt = ds.getNextInt() nextInt = ds.getNextInt()
if gConfig.record_ops: if gConfig.record_ops:
self.prepToRecordOps() self.prepToRecordOps()
self.fAddLogReady.write( self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
"Ready to write {} to {}\n".format(
nextInt, regTableName))
self.fAddLogReady.flush() self.fAddLogReady.flush()
os.fsync(self.fAddLogReady) os.fsync(self.fAddLogReady)
sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format( sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
regTableName, regTableName,
ds.getFixedSuperTableName(), # ds.getFixedSuperTableName(),
ds.getNextBinary(), ds.getNextFloat(), # ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), nextInt) ds.getNextTick(), nextInt)
self.execWtSql(wt, sql) self.execWtSql(wt, sql)
# Successfully wrote the data into the DB, let's record it # Successfully wrote the data into the DB, let's record it
...@@ -1912,6 +1954,10 @@ class Dice(): ...@@ -1912,6 +1954,10 @@ class Dice():
raise RuntimeError("Cannot throw dice before seeding it") raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop) return random.randrange(start, stop)
@classmethod
def choice(cls, cList):
return random.choice(cList)
class LoggingFilter(logging.Filter): class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord): def filter(self, record: logging.LogRecord):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册