提交 982e6465 编写于 作者: S Steven Li

Refactored crash_gen tool, plus ensured proper termination upon trasntion failure

上级 c633e91b
......@@ -352,6 +352,12 @@ class ThreadCoordinator:
self._execStats.registerFailure("Broken DB Connection")
# continue # don't do that, need to tap all threads at
# end, and maybe signal them to stop
if isinstance(err, CrashGenError): # our own transition failure
Logging.info("State transition error")
transitionFailed = True
self._te = None # Not running any more
self._execStats.registerFailure("State transition error")
# return transitionFailed # Why did we have this??!!
......@@ -388,12 +394,20 @@ class ThreadCoordinator:
self._syncAtBarrier() # For now just cross the barrier
except threading.BrokenBarrierError as err:
Logging.info("Main loop aborted, caused by worker thread(s) time-out")
self._execStats.registerFailure("Aborted due to worker thread timeout")
print("\n\nWorker Thread time-out detected, TAOS related threads are:")
Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
ts = ThreadStacks()
workerTimeout = True
# Enable below for deadlock debugging, using gdb to attach to process
# while True:
# Logging.error("Deadlock detected")
# time.sleep(60.0)
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
......@@ -701,7 +715,7 @@ class AnyState:
# task.logDebug("Task success found")
sCnt += 1
if (sCnt >= 2):
raise RuntimeError(
raise CrashGenError(
"Unexpected more than 1 success with task: {}".format(cls))
def assertIfExistThenSuccess(self, tasks, cls):
......@@ -714,7 +728,7 @@ class AnyState:
if task.isSuccess():
sCnt += 1
if (exists and sCnt <= 0):
raise RuntimeError("Unexpected zero success for task type: {}, from tasks: {}"
raise CrashGenError("Unexpected zero success for task type: {}, from tasks: {}"
.format(cls, tasks))
def assertNoTask(self, tasks, cls):
......@@ -727,7 +741,7 @@ class AnyState:
for task in tasks:
if isinstance(task, cls):
if task.isSuccess():
raise RuntimeError(
raise CrashGenError(
"Unexpected successful task: {}".format(cls))
def hasSuccess(self, tasks, cls):
......@@ -926,8 +940,9 @@ class StateMechine:
Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly()
# For sure we have tables, which means we must have the super table. # TODO: are we sure?
sTable = self._db.getFixedSuperTable()
if sTable.hasRegTables(dbc, dbName): # no regular tables
if sTable.hasRegTables(dbc): # no regular tables
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateSuperTableOnly()
else: # has actual tables
......@@ -1050,9 +1065,8 @@ class Database:
def getFixedSuperTableName(cls):
return "fs_table"
def getFixedSuperTable(cls) -> TdSuperTable:
return TdSuperTable(cls.getFixedSuperTableName())
def getFixedSuperTable(self) -> TdSuperTable:
return TdSuperTable(self.getFixedSuperTableName(), self.getName())
# We aim to create a starting time tick, such that, whenever we run our test here once
# We should be able to safely create 100,000 records, which will not have any repeated time stamp
......@@ -1107,6 +1121,11 @@ class Database:
# print("Float obtained: {}".format(ret))
return ret
ALL_COLORS = ['red', 'white', 'blue', 'green', 'purple']
def getNextColor(self):
return random.choice(self.ALL_COLORS)
class TaskExecutor():
class BoundedList:
......@@ -1240,7 +1259,7 @@ class Task():
if errno in [
0x0B, # Unable to establish connection, more details in TD-1648
0x200, # invalid SQL, TODO: re-examine with TD-934
# 0x200, # invalid SQL, TODO: re-examine with TD-934
0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
0x213, # "Disconnected from service", result of "kill connection ???"
0x217, # "db not selected", client side defined error code
......@@ -1569,8 +1588,8 @@ class TaskCreateSuperTable(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
sTable.create(wt.getDbConn(), self._db.getName(),
{'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'},
{'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'},
dropIfExists = True
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
......@@ -1579,30 +1598,33 @@ class TaskCreateSuperTable(StateTransitionTask):
class TdSuperTable:
def __init__(self, stName):
def __init__(self, stName, dbName):
self._stName = stName
self._dbName = dbName
def getName(self):
return self._stName
def drop(self, dbc, dbName, skipCheck = False):
if self.exists(dbc, dbName) : # if myself exists
def drop(self, dbc, skipCheck = False):
dbName = self._dbName
if self.exists(dbc) : # if myself exists
fullTableName = dbName + '.' + self._stName
dbc.execute("DROP TABLE {}".format(fullTableName))
if not skipCheck:
raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))
def exists(self, dbc, dbName):
dbc.execute("USE " + dbName)
def exists(self, dbc):
dbc.execute("USE " + self._dbName)
return dbc.existsSuperTable(self._stName)
# TODO: odd semantic, create() method is usually static?
def create(self, dbc, dbName, cols: dict, tags: dict,
def create(self, dbc, cols: dict, tags: dict,
dropIfExists = False
'''Creating a super table'''
dbName = self._dbName
dbc.execute("USE " + dbName)
fullTableName = dbName + '.' + self._stName
if dbc.existsSuperTable(self._stName):
......@@ -1623,7 +1645,8 @@ class TdSuperTable:
def getRegTables(self, dbc: DbConn, dbName: str):
def getRegTables(self, dbc: DbConn):
dbName = self._dbName
dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err:
......@@ -1634,10 +1657,11 @@ class TdSuperTable:
qr = dbc.getQueryResult()
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def hasRegTables(self, dbc: DbConn, dbName: str):
return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0
def hasRegTables(self, dbc: DbConn):
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
def ensureTable(self, task: Task, dbc: DbConn, dbName: str, regTableName: str):
def ensureTable(self, task: Task, dbc: DbConn, regTableName: str):
dbName = self._dbName
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already
......@@ -1650,15 +1674,15 @@ class TdSuperTable:
# print("(" + fullTableName[-3:] + ")", end="", flush=True)
sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
if task is not None:
task.unlockTable(fullTableName) # no matter what
def _getTagStrForSql(self, dbc, dbName: str) :
tags = self._getTags(dbc, dbName)
def _getTagStrForSql(self, dbc) :
tags = self._getTags(dbc)
tagStrs = []
for tagName in tags:
tagType = tags[tagName]
......@@ -1672,36 +1696,86 @@ class TdSuperTable:
raise RuntimeError("Unexpected tag type: {}".format(tagType))
return ", ".join(tagStrs)
def _getTags(self, dbc, dbName) -> dict:
dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
def _getTags(self, dbc) -> dict:
dbc.query("DESCRIBE {}.{}".format(self._dbName, self._stName))
stCols = dbc.getQueryResult()
# print(stCols)
ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
# print("Tags retrieved: {}".format(ret))
return ret
def addTag(self, dbc, dbName, tagName, tagType):
if tagName in self._getTags(dbc, dbName): # already
def addTag(self, dbc, tagName, tagType):
if tagName in self._getTags(dbc): # already
# sTable.addTag("extraTag", "int")
sql = "alter table {}.{} add tag {} {}".format(dbName, self._stName, tagName, tagType)
sql = "alter table {}.{} add tag {} {}".format(
self._dbName, self._stName, tagName, tagType)
def dropTag(self, dbc, dbName, tagName):
if not tagName in self._getTags(dbc, dbName): # don't have this tag
def dropTag(self, dbc, tagName):
if not tagName in self._getTags(dbc): # don't have this tag
sql = "alter table {}.{} drop tag {}".format(dbName, self._stName, tagName)
sql = "alter table {}.{} drop tag {}".format(self._dbName, self._stName, tagName)
def changeTag(self, dbc, dbName, oldTag, newTag):
tags = self._getTags(dbc, dbName)
def changeTag(self, dbc, oldTag, newTag):
tags = self._getTags(dbc)
if not oldTag in tags: # don't have this tag
if newTag in tags: # already have this tag
sql = "alter table {}.{} change tag {} {}".format(dbName, self._stName, oldTag, newTag)
sql = "alter table {}.{} change tag {} {}".format(self._dbName, self._stName, oldTag, newTag)
def generateQueries(self, dbc: DbConn) -> List[SqlQuery]:
''' Generate queries to test/exercise this super table '''
ret = [] # type: List[SqlQuery]
for rTbName in self.getRegTables(dbc): # regular tables
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
# Run the query against the regular table first
doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
if not doAggr: # don't do aggregate query, just simple one
ret.append(SqlQuery( # reg table
"select {} from {}.{}".format('*', self._dbName, rTbName)))
ret.append(SqlQuery( # super table
"select {} from {}.{}".format('*', self._dbName, self.getName())))
else: # Aggregate query
aggExpr = Dice.choice([
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
# SELECTOR functions
'top(speed, 50)', # TODO: not supported?
'bottom(speed, 50)', # TODO: not supported?
'apercentile(speed, 10)', # TODO: TD-1316
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
]) # TODO: add more from 'top'
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
if Dice.throw(3) == 0: # 1 in X chance
sql = sql + ' GROUP BY color'
# Logging.info("Executing GROUP-BY query: " + sql)
return ret
class TaskReadData(StateTransitionTask):
def getEndState(cls):
......@@ -1716,10 +1790,8 @@ class TaskReadData(StateTransitionTask):
# return True # always
# return gSvcMgr.isActive() # only if it's running TODO: race condition here
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTable = self._db.getFixedSuperTable()
# 1 in 5 chance, simulate a broken connection, only if service stable (not restarting)
def _reconnectIfNeeded(self, wt):
# 1 in 20 chance, simulate a broken connection, only if service stable (not restarting)
if random.randrange(20)==0: # and self._canRestartService(): # TODO: break connection in all situations
# Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
......@@ -1744,43 +1816,36 @@ class TaskReadData(StateTransitionTask):
return # TODO: fix server restart status race condtion
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
dbc = wt.getDbConn()
dbName = self._db.getName()
for rTbName in sTable.getRegTables(dbc, dbName): # regular tables
aggExpr = Dice.choice([
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
# SELECTOR functions
'top(speed, 50)', # TODO: not supported?
'bottom(speed, 50)', # TODO: not supported?
'apercentile(speed, 10)', # TODO: TD-1316
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
]) # TODO: add more from 'top'
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
sTable = self._db.getFixedSuperTable()
for q in sTable.generateQueries(dbc): # regular tables
# Run the query against the regular table first
dbc.execute("select {} from {}.{}".format(aggExpr, dbName, rTbName))
# Then run it against the super table
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName()))
sql = q.getSql()
# if 'GROUP BY' in sql:
# Logging.info("Executing GROUP-BY query: " + sql)
except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno)
Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
class SqlQuery:
def buildRandom(cls, db: Database):
'''Build a random query against a certain database'''
dbName = db.getName()
def __init__(self, sql:str = None):
self._sql = sql
def getSql(self):
return self._sql
class TaskDropSuperTable(StateTransitionTask):
def getEndState(cls):
......@@ -1837,19 +1902,18 @@ class TaskAlterTags(StateTransitionTask):
# tblName = self._dbManager.getFixedSuperTableName()
dbc = wt.getDbConn()
sTable = self._db.getFixedSuperTable()
dbName = self._db.getName()
dice = Dice.throw(4)
if dice == 0:
sTable.addTag(dbc, dbName, "extraTag", "int")
sTable.addTag(dbc, "extraTag", "int")
# sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1:
sTable.dropTag(dbc, dbName, "extraTag")
sTable.dropTag(dbc, "extraTag")
# sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2:
sTable.dropTag(dbc, dbName, "newTag")
sTable.dropTag(dbc, "newTag")
# sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3
sTable.changeTag(dbc, dbName, "extraTag", "newTag")
sTable.changeTag(dbc, "extraTag", "newTag")
# sql = "alter table db.{} change tag extraTag newTag".format(tblName)
class TaskRestartService(StateTransitionTask):
......@@ -1920,15 +1984,17 @@ class TaskAddData(StateTransitionTask):
for j in range(numRecords): # number of records per table
nextInt = db.getNextInt()
nextTick = db.getNextTick()
sql += "('{}', {});".format(nextTick, nextInt)
nextColor = db.getNextColor()
sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
def _addData(self, db, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table
nextInt = db.getNextInt()
nextTick = db.getNextTick()
nextColor = db.getNextColor()
if gConfig.record_ops:
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
......@@ -1942,11 +2008,11 @@ class TaskAddData(StateTransitionTask):
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {})
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt)
nextTick, nextInt, nextColor)
except: # Any exception at all
if gConfig.verify_data:
......@@ -1964,10 +2030,10 @@ class TaskAddData(StateTransitionTask):
.format(nextInt, readBack), 0x999)
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x991, 0x992] : # not a single result
if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT] : # not a single result
raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
.format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"),
elif errno in [0x218, 0x362]: # table doesn't exist
# do nothing
......@@ -2000,11 +2066,12 @@ class TaskAddData(StateTransitionTask):
self.activeTable.add(i) # marking it active
dbName = db.getName()
sTable = db.getFixedSuperTable()
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
fullTableName = db.getName() + '.' + regTableName
fullTableName = dbName + '.' + regTableName
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
sTable.ensureTable(self, wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists
sTable.ensureTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
# self._unlockTable(fullTableName)
if Dice.throw(1) == 0: # 1 in 2 chance
......@@ -2024,7 +2091,7 @@ class ThreadStacks: # stack info for all threads
self._allStacks[th.native_id] = stack
def print(self, filteredEndName = None, filterInternal = False):
for thNid, stack in self._allStacks.items(): # for each thread
for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
lastFrame = stack[-1]
if filteredEndName: # we need to filter out stacks that match this name
if lastFrame.name == filteredEndName : # end did not match
......@@ -2036,9 +2103,9 @@ class ThreadStacks: # stack info for all threads
'__init__']: # the thread that extracted the stack
continue # ignore
# Now print
print("\n<----- Thread Info for LWP/ID: {} (Execution stopped at Bottom Frame) <-----".format(thNid))
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid))
stackFrame = 0
for frame in stack:
for frame in stack: # was using: reversed(stack)
# print(frame)
print("[{sf}] File {filename}, line {lineno}, in {name}".format(
sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
......@@ -78,7 +78,7 @@ class DbConn:
if nRows != 1:
raise taos.error.ProgrammingError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
(0x991 if nRows==0 else 0x992)
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
......@@ -349,7 +349,8 @@ class DbConnNative(DbConn):
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError("Cannot execute database commands until connection is open")
raise CrashGenError(
"Cannot exec SQL unless db connection is open", CrashGenError.DB_CONNECTION_NOT_OPEN)
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.execute(sql)
......@@ -360,8 +361,8 @@ class DbConnNative(DbConn):
def query(self, sql): # return rows affected
if (not self.isOpen):
raise RuntimeError(
"Cannot query database until connection is open")
raise CrashGenError(
"Cannot query database until connection is open, restarting?", CrashGenError.DB_CONNECTION_NOT_OPEN)
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.query(sql)
......@@ -3,14 +3,20 @@ import random
import logging
import os
import taos
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self.errno = errno
def __str__(self):
return self.msg
class CrashGenError(taos.error.ProgrammingError):
# def __init__(self, msg=None, errno=None):
# self.msg = msg
# self.errno = errno
# def __str__(self):
# return self.msg
class LoggingFilter(logging.Filter):
......@@ -168,6 +174,7 @@ class Progress:
tokens = {
......@@ -178,7 +185,8 @@ class Progress:
......@@ -51,10 +51,12 @@ class TdeInstance():
def prepareGcovEnv(cls, env):
# Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html
bPath = cls._getBuildPath() # build PATH
numSegments = len(bPath.split('/')) - 1 # "/x/TDengine/build" should yield 3
numSegments = numSegments - 1 # DEBUG only
env['GCOV_PREFIX'] = bPath + '/svc_gcov'
numSegments = len(bPath.split('/')) # "/x/TDengine/build" should yield 3
# numSegments += 2 # cover "/src" after build
# numSegments = numSegments - 1 # DEBUG only
env['GCOV_PREFIX'] = bPath + '/src_s' # Server side source
env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings
# VERY VERY important note: GCOV data collection NOT effective upon SIG_KILL
Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format(
numSegments, env['GCOV_PREFIX'] ))
......@@ -258,14 +260,15 @@ class TdeSubProcess:
# print(myEnv)
# print(myEnv.items())
# print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
useShell = True
self.subProcess = subprocess.Popen(
' '.join(cmdLine) if useShell else cmdLine,
# svcCmdSingle, shell=True, # capture core dump?
# ' '.join(cmdLine) if useShell else cmdLine,
# shell=useShell,
' '.join(cmdLine),
# bufsize=1, # not supported in binary mode
......@@ -273,7 +276,8 @@ class TdeSubProcess:
) # had text=True, which interferred with reading EOF
STOP_SIGNAL = signal.SIGKILL # What signal to use (in kill) to stop a taosd process?
STOP_SIGNAL = signal.SIGKILL # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
def stop(self):
......@@ -320,8 +324,12 @@ class TdeSubProcess:
retCode = self.subProcess.returncode # should always be there
# May throw subprocess.TimeoutExpired exception above, therefore
# The process is guranteed to have ended by now
self.subProcess = None
if retCode != 0: # != (- signal.SIGINT):
self.subProcess = None
if retCode == self.SIG_KILL_RETCODE:
Logging.info("TSP.stop(): sub proc KILLED, as expected")
elif retCode == (- self.STOP_SIGNAL):
Logging.info("TSP.stop(), sub process STOPPED, as expected")
elif retCode != 0: # != (- signal.SIGINT):
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(
self.STOP_SIGNAL, retCode))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册