未验证 提交 fd0c6ade 编写于 作者: H huili 提交者: GitHub

Merge pull request #4407 from taosdata/feature/crash_gen

Refactored crash_gen tool
...@@ -352,6 +352,12 @@ class ThreadCoordinator: ...@@ -352,6 +352,12 @@ class ThreadCoordinator:
self._execStats.registerFailure("Broken DB Connection") self._execStats.registerFailure("Broken DB Connection")
# continue # don't do that, need to tap all threads at # continue # don't do that, need to tap all threads at
# end, and maybe signal them to stop # end, and maybe signal them to stop
if isinstance(err, CrashGenError): # our own transition failure
Logging.info("State transition error")
traceback.print_stack()
transitionFailed = True
self._te = None # Not running any more
self._execStats.registerFailure("State transition error")
else: else:
raise raise
# return transitionFailed # Why did we have this??!! # return transitionFailed # Why did we have this??!!
...@@ -388,12 +394,20 @@ class ThreadCoordinator: ...@@ -388,12 +394,20 @@ class ThreadCoordinator:
self._syncAtBarrier() # For now just cross the barrier self._syncAtBarrier() # For now just cross the barrier
Progress.emit(Progress.END_THREAD_STEP) Progress.emit(Progress.END_THREAD_STEP)
except threading.BrokenBarrierError as err: except threading.BrokenBarrierError as err:
Logging.info("Main loop aborted, caused by worker thread(s) time-out")
self._execStats.registerFailure("Aborted due to worker thread timeout") self._execStats.registerFailure("Aborted due to worker thread timeout")
print("\n\nWorker Thread time-out detected, TAOS related threads are:") Logging.error("\n")
Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
ThreadCoordinator.WORKER_THREAD_TIMEOUT))
Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
ts = ThreadStacks() ts = ThreadStacks()
ts.print(filterInternal=True) ts.print(filterInternal=True)
workerTimeout = True workerTimeout = True
# Enable below for deadlock debugging, using gdb to attach to process
# while True:
# Logging.error("Deadlock detected")
# time.sleep(60.0)
break break
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
...@@ -701,7 +715,7 @@ class AnyState: ...@@ -701,7 +715,7 @@ class AnyState:
# task.logDebug("Task success found") # task.logDebug("Task success found")
sCnt += 1 sCnt += 1
if (sCnt >= 2): if (sCnt >= 2):
raise RuntimeError( raise CrashGenError(
"Unexpected more than 1 success with task: {}".format(cls)) "Unexpected more than 1 success with task: {}".format(cls))
def assertIfExistThenSuccess(self, tasks, cls): def assertIfExistThenSuccess(self, tasks, cls):
...@@ -714,7 +728,7 @@ class AnyState: ...@@ -714,7 +728,7 @@ class AnyState:
if task.isSuccess(): if task.isSuccess():
sCnt += 1 sCnt += 1
if (exists and sCnt <= 0): if (exists and sCnt <= 0):
raise RuntimeError("Unexpected zero success for task type: {}, from tasks: {}" raise CrashGenError("Unexpected zero success for task type: {}, from tasks: {}"
.format(cls, tasks)) .format(cls, tasks))
def assertNoTask(self, tasks, cls): def assertNoTask(self, tasks, cls):
...@@ -727,7 +741,7 @@ class AnyState: ...@@ -727,7 +741,7 @@ class AnyState:
for task in tasks: for task in tasks:
if isinstance(task, cls): if isinstance(task, cls):
if task.isSuccess(): if task.isSuccess():
raise RuntimeError( raise CrashGenError(
"Unexpected successful task: {}".format(cls)) "Unexpected successful task: {}".format(cls))
def hasSuccess(self, tasks, cls): def hasSuccess(self, tasks, cls):
...@@ -926,8 +940,9 @@ class StateMechine: ...@@ -926,8 +940,9 @@ class StateMechine:
Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly() return StateDbOnly()
# For sure we have tables, which means we must have the super table. # TODO: are we sure?
sTable = self._db.getFixedSuperTable() sTable = self._db.getFixedSuperTable()
if sTable.hasRegTables(dbc, dbName): # no regular tables if sTable.hasRegTables(dbc): # no regular tables
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateSuperTableOnly() return StateSuperTableOnly()
else: # has actual tables else: # has actual tables
...@@ -1050,9 +1065,8 @@ class Database: ...@@ -1050,9 +1065,8 @@ class Database:
def getFixedSuperTableName(cls): def getFixedSuperTableName(cls):
return "fs_table" return "fs_table"
@classmethod def getFixedSuperTable(self) -> TdSuperTable:
def getFixedSuperTable(cls) -> TdSuperTable: return TdSuperTable(self.getFixedSuperTableName(), self.getName())
return TdSuperTable(cls.getFixedSuperTableName())
# We aim to create a starting time tick, such that, whenever we run our test here once # We aim to create a starting time tick, such that, whenever we run our test here once
# We should be able to safely create 100,000 records, which will not have any repeated time stamp # We should be able to safely create 100,000 records, which will not have any repeated time stamp
...@@ -1107,6 +1121,11 @@ class Database: ...@@ -1107,6 +1121,11 @@ class Database:
# print("Float obtained: {}".format(ret)) # print("Float obtained: {}".format(ret))
return ret return ret
ALL_COLORS = ['red', 'white', 'blue', 'green', 'purple']
def getNextColor(self):
return random.choice(self.ALL_COLORS)
class TaskExecutor(): class TaskExecutor():
class BoundedList: class BoundedList:
...@@ -1240,7 +1259,7 @@ class Task(): ...@@ -1240,7 +1259,7 @@ class Task():
if errno in [ if errno in [
0x05, # TSDB_CODE_RPC_NOT_READY 0x05, # TSDB_CODE_RPC_NOT_READY
0x0B, # Unable to establish connection, more details in TD-1648 0x0B, # Unable to establish connection, more details in TD-1648
0x200, # invalid SQL, TODO: re-examine with TD-934 # 0x200, # invalid SQL, TODO: re-examine with TD-934
0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776 0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
0x213, # "Disconnected from service", result of "kill connection ???" 0x213, # "Disconnected from service", result of "kill connection ???"
0x217, # "db not selected", client side defined error code 0x217, # "db not selected", client side defined error code
...@@ -1569,8 +1588,8 @@ class TaskCreateSuperTable(StateTransitionTask): ...@@ -1569,8 +1588,8 @@ class TaskCreateSuperTable(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
sTable.create(wt.getDbConn(), self._db.getName(), sTable.create(wt.getDbConn(),
{'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}, {'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'},
dropIfExists = True dropIfExists = True
) )
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
...@@ -1579,30 +1598,33 @@ class TaskCreateSuperTable(StateTransitionTask): ...@@ -1579,30 +1598,33 @@ class TaskCreateSuperTable(StateTransitionTask):
class TdSuperTable: class TdSuperTable:
def __init__(self, stName): def __init__(self, stName, dbName):
self._stName = stName self._stName = stName
self._dbName = dbName
def getName(self): def getName(self):
return self._stName return self._stName
def drop(self, dbc, dbName, skipCheck = False): def drop(self, dbc, skipCheck = False):
if self.exists(dbc, dbName) : # if myself exists dbName = self._dbName
if self.exists(dbc) : # if myself exists
fullTableName = dbName + '.' + self._stName fullTableName = dbName + '.' + self._stName
dbc.execute("DROP TABLE {}".format(fullTableName)) dbc.execute("DROP TABLE {}".format(fullTableName))
else: else:
if not skipCheck: if not skipCheck:
raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName)) raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))
def exists(self, dbc, dbName): def exists(self, dbc):
dbc.execute("USE " + dbName) dbc.execute("USE " + self._dbName)
return dbc.existsSuperTable(self._stName) return dbc.existsSuperTable(self._stName)
# TODO: odd semantic, create() method is usually static? # TODO: odd semantic, create() method is usually static?
def create(self, dbc, dbName, cols: dict, tags: dict, def create(self, dbc, cols: dict, tags: dict,
dropIfExists = False dropIfExists = False
): ):
'''Creating a super table''' '''Creating a super table'''
dbName = self._dbName
dbc.execute("USE " + dbName) dbc.execute("USE " + dbName)
fullTableName = dbName + '.' + self._stName fullTableName = dbName + '.' + self._stName
if dbc.existsSuperTable(self._stName): if dbc.existsSuperTable(self._stName):
...@@ -1623,7 +1645,8 @@ class TdSuperTable: ...@@ -1623,7 +1645,8 @@ class TdSuperTable:
) )
dbc.execute(sql) dbc.execute(sql)
def getRegTables(self, dbc: DbConn, dbName: str): def getRegTables(self, dbc: DbConn):
dbName = self._dbName
try: try:
dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
...@@ -1634,10 +1657,11 @@ class TdSuperTable: ...@@ -1634,10 +1657,11 @@ class TdSuperTable:
qr = dbc.getQueryResult() qr = dbc.getQueryResult()
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def hasRegTables(self, dbc: DbConn, dbName: str): def hasRegTables(self, dbc: DbConn):
return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0 return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
def ensureTable(self, task: Task, dbc: DbConn, dbName: str, regTableName: str): def ensureTable(self, task: Task, dbc: DbConn, regTableName: str):
dbName = self._dbName
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already if dbc.query(sql) >= 1 : # reg table exists already
return return
...@@ -1650,15 +1674,15 @@ class TdSuperTable: ...@@ -1650,15 +1674,15 @@ class TdSuperTable:
# print("(" + fullTableName[-3:] + ")", end="", flush=True) # print("(" + fullTableName[-3:] + ")", end="", flush=True)
try: try:
sql = "CREATE TABLE {} USING {}.{} tags ({})".format( sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName) fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
) )
dbc.execute(sql) dbc.execute(sql)
finally: finally:
if task is not None: if task is not None:
task.unlockTable(fullTableName) # no matter what task.unlockTable(fullTableName) # no matter what
def _getTagStrForSql(self, dbc, dbName: str) : def _getTagStrForSql(self, dbc) :
tags = self._getTags(dbc, dbName) tags = self._getTags(dbc)
tagStrs = [] tagStrs = []
for tagName in tags: for tagName in tags:
tagType = tags[tagName] tagType = tags[tagName]
...@@ -1672,36 +1696,86 @@ class TdSuperTable: ...@@ -1672,36 +1696,86 @@ class TdSuperTable:
raise RuntimeError("Unexpected tag type: {}".format(tagType)) raise RuntimeError("Unexpected tag type: {}".format(tagType))
return ", ".join(tagStrs) return ", ".join(tagStrs)
def _getTags(self, dbc, dbName) -> dict: def _getTags(self, dbc) -> dict:
dbc.query("DESCRIBE {}.{}".format(dbName, self._stName)) dbc.query("DESCRIBE {}.{}".format(self._dbName, self._stName))
stCols = dbc.getQueryResult() stCols = dbc.getQueryResult()
# print(stCols) # print(stCols)
ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
# print("Tags retrieved: {}".format(ret)) # print("Tags retrieved: {}".format(ret))
return ret return ret
def addTag(self, dbc, dbName, tagName, tagType): def addTag(self, dbc, tagName, tagType):
if tagName in self._getTags(dbc, dbName): # already if tagName in self._getTags(dbc): # already
return return
# sTable.addTag("extraTag", "int") # sTable.addTag("extraTag", "int")
sql = "alter table {}.{} add tag {} {}".format(dbName, self._stName, tagName, tagType) sql = "alter table {}.{} add tag {} {}".format(
self._dbName, self._stName, tagName, tagType)
dbc.execute(sql) dbc.execute(sql)
def dropTag(self, dbc, dbName, tagName): def dropTag(self, dbc, tagName):
if not tagName in self._getTags(dbc, dbName): # don't have this tag if not tagName in self._getTags(dbc): # don't have this tag
return return
sql = "alter table {}.{} drop tag {}".format(dbName, self._stName, tagName) sql = "alter table {}.{} drop tag {}".format(self._dbName, self._stName, tagName)
dbc.execute(sql) dbc.execute(sql)
def changeTag(self, dbc, dbName, oldTag, newTag): def changeTag(self, dbc, oldTag, newTag):
tags = self._getTags(dbc, dbName) tags = self._getTags(dbc)
if not oldTag in tags: # don't have this tag if not oldTag in tags: # don't have this tag
return return
if newTag in tags: # already have this tag if newTag in tags: # already have this tag
return return
sql = "alter table {}.{} change tag {} {}".format(dbName, self._stName, oldTag, newTag) sql = "alter table {}.{} change tag {} {}".format(self._dbName, self._stName, oldTag, newTag)
dbc.execute(sql) dbc.execute(sql)
def generateQueries(self, dbc: DbConn) -> List[SqlQuery]:
''' Generate queries to test/exercise this super table '''
ret = [] # type: List[SqlQuery]
for rTbName in self.getRegTables(dbc): # regular tables
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
None
])
# Run the query against the regular table first
doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
if not doAggr: # don't do aggregate query, just simple one
ret.append(SqlQuery( # reg table
"select {} from {}.{}".format('*', self._dbName, rTbName)))
ret.append(SqlQuery( # super table
"select {} from {}.{}".format('*', self._dbName, self.getName())))
else: # Aggregate query
aggExpr = Dice.choice([
'count(*)',
'avg(speed)',
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)',
'stddev(speed)',
# SELECTOR functions
'min(speed)',
'max(speed)',
'first(speed)',
'last(speed)',
'top(speed, 50)', # TODO: not supported?
'bottom(speed, 50)', # TODO: not supported?
'apercentile(speed, 10)', # TODO: TD-1316
'last_row(speed)',
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
'spread(speed)'
]) # TODO: add more from 'top'
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
if Dice.throw(3) == 0: # 1 in X chance
sql = sql + ' GROUP BY color'
Progress.emit(Progress.QUERY_GROUP_BY)
# Logging.info("Executing GROUP-BY query: " + sql)
ret.append(SqlQuery(sql))
return ret
class TaskReadData(StateTransitionTask): class TaskReadData(StateTransitionTask):
@classmethod @classmethod
def getEndState(cls): def getEndState(cls):
...@@ -1716,10 +1790,8 @@ class TaskReadData(StateTransitionTask): ...@@ -1716,10 +1790,8 @@ class TaskReadData(StateTransitionTask):
# return True # always # return True # always
# return gSvcMgr.isActive() # only if it's running TODO: race condition here # return gSvcMgr.isActive() # only if it's running TODO: race condition here
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _reconnectIfNeeded(self, wt):
sTable = self._db.getFixedSuperTable() # 1 in 20 chance, simulate a broken connection, only if service stable (not restarting)
# 1 in 5 chance, simulate a broken connection, only if service stable (not restarting)
if random.randrange(20)==0: # and self._canRestartService(): # TODO: break connection in all situations if random.randrange(20)==0: # and self._canRestartService(): # TODO: break connection in all situations
# Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG # Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
Progress.emit(Progress.SERVICE_RECONNECT_START) Progress.emit(Progress.SERVICE_RECONNECT_START)
...@@ -1744,43 +1816,36 @@ class TaskReadData(StateTransitionTask): ...@@ -1744,43 +1816,36 @@ class TaskReadData(StateTransitionTask):
return # TODO: fix server restart status race condtion return # TODO: fix server restart status race condtion
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
self._reconnectIfNeeded(wt)
dbc = wt.getDbConn() dbc = wt.getDbConn()
dbName = self._db.getName() sTable = self._db.getFixedSuperTable()
for rTbName in sTable.getRegTables(dbc, dbName): # regular tables
aggExpr = Dice.choice([ for q in sTable.generateQueries(dbc): # regular tables
'*',
'count(*)',
'avg(speed)',
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)',
'stddev(speed)',
# SELECTOR functions
'min(speed)',
'max(speed)',
'first(speed)',
'last(speed)',
'top(speed, 50)', # TODO: not supported?
'bottom(speed, 50)', # TODO: not supported?
'apercentile(speed, 10)', # TODO: TD-1316
'last_row(speed)',
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
'spread(speed)'
]) # TODO: add more from 'top'
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
None
])
try: try:
# Run the query against the regular table first sql = q.getSql()
dbc.execute("select {} from {}.{}".format(aggExpr, dbName, rTbName)) # if 'GROUP BY' in sql:
# Then run it against the super table # Logging.info("Executing GROUP-BY query: " + sql)
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! dbc.execute(sql)
dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
raise raise
class SqlQuery:
@classmethod
def buildRandom(cls, db: Database):
'''Build a random query against a certain database'''
dbName = db.getName()
def __init__(self, sql:str = None):
self._sql = sql
def getSql(self):
return self._sql
class TaskDropSuperTable(StateTransitionTask): class TaskDropSuperTable(StateTransitionTask):
@classmethod @classmethod
def getEndState(cls): def getEndState(cls):
...@@ -1837,19 +1902,18 @@ class TaskAlterTags(StateTransitionTask): ...@@ -1837,19 +1902,18 @@ class TaskAlterTags(StateTransitionTask):
# tblName = self._dbManager.getFixedSuperTableName() # tblName = self._dbManager.getFixedSuperTableName()
dbc = wt.getDbConn() dbc = wt.getDbConn()
sTable = self._db.getFixedSuperTable() sTable = self._db.getFixedSuperTable()
dbName = self._db.getName()
dice = Dice.throw(4) dice = Dice.throw(4)
if dice == 0: if dice == 0:
sTable.addTag(dbc, dbName, "extraTag", "int") sTable.addTag(dbc, "extraTag", "int")
# sql = "alter table db.{} add tag extraTag int".format(tblName) # sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1: elif dice == 1:
sTable.dropTag(dbc, dbName, "extraTag") sTable.dropTag(dbc, "extraTag")
# sql = "alter table db.{} drop tag extraTag".format(tblName) # sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2: elif dice == 2:
sTable.dropTag(dbc, dbName, "newTag") sTable.dropTag(dbc, "newTag")
# sql = "alter table db.{} drop tag newTag".format(tblName) # sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3 else: # dice == 3
sTable.changeTag(dbc, dbName, "extraTag", "newTag") sTable.changeTag(dbc, "extraTag", "newTag")
# sql = "alter table db.{} change tag extraTag newTag".format(tblName) # sql = "alter table db.{} change tag extraTag newTag".format(tblName)
class TaskRestartService(StateTransitionTask): class TaskRestartService(StateTransitionTask):
...@@ -1920,15 +1984,17 @@ class TaskAddData(StateTransitionTask): ...@@ -1920,15 +1984,17 @@ class TaskAddData(StateTransitionTask):
for j in range(numRecords): # number of records per table for j in range(numRecords): # number of records per table
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextTick = db.getNextTick() nextTick = db.getNextTick()
sql += "('{}', {});".format(nextTick, nextInt) nextColor = db.getNextColor()
sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
dbc.execute(sql) dbc.execute(sql)
def _addData(self, db, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table for j in range(numRecords): # number of records per table
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextTick = db.getNextTick() nextTick = db.getNextTick()
nextColor = db.getNextColor()
if gConfig.record_ops: if gConfig.record_ops:
self.prepToRecordOps() self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
...@@ -1942,11 +2008,11 @@ class TaskAddData(StateTransitionTask): ...@@ -1942,11 +2008,11 @@ class TaskAddData(StateTransitionTask):
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
try: try:
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {}) sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {})
fullTableName, fullTableName,
# ds.getFixedSuperTableName(), # ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(), # ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt) nextTick, nextInt, nextColor)
dbc.execute(sql) dbc.execute(sql)
except: # Any exception at all except: # Any exception at all
if gConfig.verify_data: if gConfig.verify_data:
...@@ -1964,10 +2030,10 @@ class TaskAddData(StateTransitionTask): ...@@ -1964,10 +2030,10 @@ class TaskAddData(StateTransitionTask):
.format(nextInt, readBack), 0x999) .format(nextInt, readBack), 0x999)
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [0x991, 0x992] : # not a single result if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT] : # not a single result
raise taos.error.ProgrammingError( raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: {}" "Failed to read back same data for tick: {}, wrote: {}, read: {}"
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"), .format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"),
errno) errno)
elif errno in [0x218, 0x362]: # table doesn't exist elif errno in [0x218, 0x362]: # table doesn't exist
# do nothing # do nothing
...@@ -2000,11 +2066,12 @@ class TaskAddData(StateTransitionTask): ...@@ -2000,11 +2066,12 @@ class TaskAddData(StateTransitionTask):
else: else:
self.activeTable.add(i) # marking it active self.activeTable.add(i) # marking it active
dbName = db.getName()
sTable = db.getFixedSuperTable() sTable = db.getFixedSuperTable()
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
fullTableName = db.getName() + '.' + regTableName fullTableName = dbName + '.' + regTableName
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked" # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
sTable.ensureTable(self, wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists sTable.ensureTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
# self._unlockTable(fullTableName) # self._unlockTable(fullTableName)
if Dice.throw(1) == 0: # 1 in 2 chance if Dice.throw(1) == 0: # 1 in 2 chance
...@@ -2024,7 +2091,7 @@ class ThreadStacks: # stack info for all threads ...@@ -2024,7 +2091,7 @@ class ThreadStacks: # stack info for all threads
self._allStacks[th.native_id] = stack self._allStacks[th.native_id] = stack
def print(self, filteredEndName = None, filterInternal = False): def print(self, filteredEndName = None, filterInternal = False):
for thNid, stack in self._allStacks.items(): # for each thread for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
lastFrame = stack[-1] lastFrame = stack[-1]
if filteredEndName: # we need to filter out stacks that match this name if filteredEndName: # we need to filter out stacks that match this name
if lastFrame.name == filteredEndName : # end did not match if lastFrame.name == filteredEndName : # end did not match
...@@ -2036,9 +2103,9 @@ class ThreadStacks: # stack info for all threads ...@@ -2036,9 +2103,9 @@ class ThreadStacks: # stack info for all threads
'__init__']: # the thread that extracted the stack '__init__']: # the thread that extracted the stack
continue # ignore continue # ignore
# Now print # Now print
print("\n<----- Thread Info for LWP/ID: {} (Execution stopped at Bottom Frame) <-----".format(thNid)) print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid))
stackFrame = 0 stackFrame = 0
for frame in stack: for frame in stack: # was using: reversed(stack)
# print(frame) # print(frame)
print("[{sf}] File {filename}, line {lineno}, in {name}".format( print("[{sf}] File {filename}, line {lineno}, in {name}".format(
sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name)) sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
......
...@@ -78,7 +78,7 @@ class DbConn: ...@@ -78,7 +78,7 @@ class DbConn:
if nRows != 1: if nRows != 1:
raise taos.error.ProgrammingError( raise taos.error.ProgrammingError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows), "Unexpected result for query: {}, rows = {}".format(sql, nRows),
(0x991 if nRows==0 else 0x992) (CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT)
) )
if self.getResultRows() != 1 or self.getResultCols() != 1: if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql)) raise RuntimeError("Unexpected result set for query: {}".format(sql))
...@@ -349,7 +349,8 @@ class DbConnNative(DbConn): ...@@ -349,7 +349,8 @@ class DbConnNative(DbConn):
def execute(self, sql): def execute(self, sql):
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError("Cannot execute database commands until connection is open") raise CrashGenError(
"Cannot exec SQL unless db connection is open", CrashGenError.DB_CONNECTION_NOT_OPEN)
Logging.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
nRows = self._tdSql.execute(sql) nRows = self._tdSql.execute(sql)
...@@ -360,8 +361,8 @@ class DbConnNative(DbConn): ...@@ -360,8 +361,8 @@ class DbConnNative(DbConn):
def query(self, sql): # return rows affected def query(self, sql): # return rows affected
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError( raise CrashGenError(
"Cannot query database until connection is open") "Cannot query database until connection is open, restarting?", CrashGenError.DB_CONNECTION_NOT_OPEN)
Logging.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
nRows = self._tdSql.query(sql) nRows = self._tdSql.query(sql)
......
...@@ -3,14 +3,20 @@ import random ...@@ -3,14 +3,20 @@ import random
import logging import logging
import os import os
import taos
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self.errno = errno
def __str__(self): class CrashGenError(taos.error.ProgrammingError):
return self.msg INVALID_EMPTY_RESULT = 0x991
INVALID_MULTIPLE_RESULT = 0x992
DB_CONNECTION_NOT_OPEN = 0x993
# def __init__(self, msg=None, errno=None):
# self.msg = msg
# self.errno = errno
# def __str__(self):
# return self.msg
pass
class LoggingFilter(logging.Filter): class LoggingFilter(logging.Filter):
...@@ -168,6 +174,7 @@ class Progress: ...@@ -168,6 +174,7 @@ class Progress:
SERVICE_RECONNECT_FAILURE = 6 SERVICE_RECONNECT_FAILURE = 6
SERVICE_START_NAP = 7 SERVICE_START_NAP = 7
CREATE_TABLE_ATTEMPT = 8 CREATE_TABLE_ATTEMPT = 8
QUERY_GROUP_BY = 9
tokens = { tokens = {
STEP_BOUNDARY: '.', STEP_BOUNDARY: '.',
...@@ -178,7 +185,8 @@ class Progress: ...@@ -178,7 +185,8 @@ class Progress:
SERVICE_RECONNECT_SUCCESS: '.r>', SERVICE_RECONNECT_SUCCESS: '.r>',
SERVICE_RECONNECT_FAILURE: '.xr>', SERVICE_RECONNECT_FAILURE: '.xr>',
SERVICE_START_NAP: '_zz', SERVICE_START_NAP: '_zz',
CREATE_TABLE_ATTEMPT: '_c', CREATE_TABLE_ATTEMPT: 'c',
QUERY_GROUP_BY: 'g',
} }
@classmethod @classmethod
......
...@@ -51,10 +51,12 @@ class TdeInstance(): ...@@ -51,10 +51,12 @@ class TdeInstance():
def prepareGcovEnv(cls, env): def prepareGcovEnv(cls, env):
# Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html # Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html
bPath = cls._getBuildPath() # build PATH bPath = cls._getBuildPath() # build PATH
numSegments = len(bPath.split('/')) - 1 # "/x/TDengine/build" should yield 3 numSegments = len(bPath.split('/')) # "/x/TDengine/build" should yield 3
numSegments = numSegments - 1 # DEBUG only # numSegments += 2 # cover "/src" after build
env['GCOV_PREFIX'] = bPath + '/svc_gcov' # numSegments = numSegments - 1 # DEBUG only
env['GCOV_PREFIX'] = bPath + '/src_s' # Server side source
env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings
# VERY VERY important note: GCOV data collection NOT effective upon SIG_KILL
Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format( Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format(
numSegments, env['GCOV_PREFIX'] )) numSegments, env['GCOV_PREFIX'] ))
...@@ -258,14 +260,15 @@ class TdeSubProcess: ...@@ -258,14 +260,15 @@ class TdeSubProcess:
TdeInstance.prepareGcovEnv(myEnv) TdeInstance.prepareGcovEnv(myEnv)
# print(myEnv) # print(myEnv)
# print(myEnv.items()) # print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr)) # print("Starting TDengine via Shell: {}".format(cmdLineStr))
useShell = True useShell = True
self.subProcess = subprocess.Popen( self.subProcess = subprocess.Popen(
' '.join(cmdLine) if useShell else cmdLine, # ' '.join(cmdLine) if useShell else cmdLine,
shell=useShell, # shell=useShell,
# svcCmdSingle, shell=True, # capture core dump? ' '.join(cmdLine),
shell=True,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
# bufsize=1, # not supported in binary mode # bufsize=1, # not supported in binary mode
...@@ -273,7 +276,8 @@ class TdeSubProcess: ...@@ -273,7 +276,8 @@ class TdeSubProcess:
env=myEnv env=myEnv
) # had text=True, which interferred with reading EOF ) # had text=True, which interferred with reading EOF
STOP_SIGNAL = signal.SIGKILL # What signal to use (in kill) to stop a taosd process? STOP_SIGNAL = signal.SIGKILL # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
def stop(self): def stop(self):
""" """
...@@ -321,7 +325,11 @@ class TdeSubProcess: ...@@ -321,7 +325,11 @@ class TdeSubProcess:
# May throw subprocess.TimeoutExpired exception above, therefore # May throw subprocess.TimeoutExpired exception above, therefore
# The process is guranteed to have ended by now # The process is guranteed to have ended by now
self.subProcess = None self.subProcess = None
if retCode != 0: # != (- signal.SIGINT): if retCode == self.SIG_KILL_RETCODE:
Logging.info("TSP.stop(): sub proc KILLED, as expected")
elif retCode == (- self.STOP_SIGNAL):
Logging.info("TSP.stop(), sub process STOPPED, as expected")
elif retCode != 0: # != (- signal.SIGINT):
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format( Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(
self.STOP_SIGNAL, retCode)) self.STOP_SIGNAL, retCode))
else: else:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册