未验证 提交 744c2550 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2755 from taosdata/feature/crash_gen

Crash_gen tool update to duplicate TD-989
......@@ -42,6 +42,13 @@ import os
import io
import signal
import traceback
import psutil
print("Psutil module needed, please install: sudo pip3 install psutil")
# Require Python 3
if sys.version_info[0] < 3:
raise Exception("Must be using Python 3")
......@@ -52,13 +59,12 @@ if sys.version_info[0] < 3:
# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
gConfig = argparse.Namespace() # Dummy value, will be replaced later
gSvcMgr = None # TODO: refactor this hack, use dep injection
logger = None
def runThread(wt: WorkerThread):
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
......@@ -69,8 +75,7 @@ class CrashGenError(Exception):
class WorkerThread:
def __init__(self, pool: ThreadPool, tid,
tc: ThreadCoordinator,
def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
# te: TaskExecutor,
): # note: main thread context!
# self._curStep = -1
......@@ -131,18 +136,28 @@ class WorkerThread:
# clean up
if (gConfig.per_thread_db_connection): # type: ignore
if self._dbConn.isOpen: #sometimes it is not open
logger.warning("Cleaning up worker thread, dbConn already closed")
def _doTaskLoop(self):
# while self._curStep < self._pool.maxSteps:
# tc = ThreadCoordinator(None)
while True:
tc = self._tc # Thread Coordinator, the overall master
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
except threading.BrokenBarrierError as err: # main thread timed out
print("_bto", end="")
logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
self.crossStepGate() # then per-thread gate, after being tapped
logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
if not self._tc.isRunning():
print("_wts", end="")
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
......@@ -159,6 +174,7 @@ class WorkerThread:
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
self._dbInUse = False # there may be changes between steps
# print("_wtd", end=None) # worker thread died
def verifyThreadSelf(self): # ensure we are called by this own thread
if (threading.get_ident() != self._thread.ident):
......@@ -187,30 +203,24 @@ class WorkerThread:
# self._curStep += 1 # off to a new step...
def tapStepGate(self): # give it a tap, release the thread waiting there
# self.verifyThreadAlive()
self.verifyThreadMain() # only allowed for main thread
logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit
if self._thread.is_alive():
logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit
print("_tad", end="") # Thread already dead
def execSql(self, sql): # TODO: expose DbConn directly
if (gConfig.per_thread_db_connection):
return self._dbConn.execute(sql)
return self._tc.getDbManager().getDbConn().execute(sql)
return self.getDbConn().execute(sql)
def querySql(self, sql): # TODO: expose DbConn directly
if (gConfig.per_thread_db_connection):
return self._dbConn.query(sql)
return self._tc.getDbManager().getDbConn().query(sql)
return self.getDbConn().query(sql)
def getQueryResult(self):
if (gConfig.per_thread_db_connection):
return self._dbConn.getQueryResult()
return self._tc.getDbManager().getDbConn().getQueryResult()
return self.getDbConn().getQueryResult()
def getDbConn(self):
if (gConfig.per_thread_db_connection):
......@@ -228,6 +238,8 @@ class WorkerThread:
class ThreadCoordinator:
def __init__(self, pool: ThreadPool, dbManager):
self._curStep = -1 # first step is 0
self._pool = pool
......@@ -248,14 +260,14 @@ class ThreadCoordinator:
def getDbManager(self) -> DbManager:
return self._dbManager
def crossStepBarrier(self):
def crossStepBarrier(self, timeout=None):
def requestToStop(self):
self._runStatus = MainExec.STATUS_STOPPING
self._execStats.registerFailure("User Interruption")
def _runShouldEnd(self, transitionFailed, hasAbortedTask):
def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
maxSteps = gConfig.max_steps # type: ignore
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
return True
......@@ -265,6 +277,8 @@ class ThreadCoordinator:
return True
if hasAbortedTask:
return True
if workerTimeout:
return True
return False
def _hasAbortedTask(self): # from execution of previous step
......@@ -296,7 +310,7 @@ class ThreadCoordinator:
# let other threads go past the pool barrier, but wait at the
# thread gate
logger.debug("[TRD] Main thread about to cross the barrier")
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
logger.debug("[TRD] Main thread finished crossing the barrier")
......@@ -327,6 +341,7 @@ class ThreadCoordinator:
# end, and maybe signal them to stop
return transitionFailed
self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step
......@@ -342,11 +357,21 @@ class ThreadCoordinator:
self._execStats.startExec() # start the stop watch
transitionFailed = False
hasAbortedTask = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask):
workerTimeout = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
if not gConfig.debug: # print this only if we are not in debug mode
print(".", end="", flush=True)
self._syncAtBarrier() # For now just cross the barrier
self._syncAtBarrier() # For now just cross the barrier
except threading.BrokenBarrierError as err:
logger.info("Main loop aborted, caused by worker thread time-out")
self._execStats.registerFailure("Aborted due to worker thread timeout")
print("\n\nWorker Thread time-out detected, important thread info:")
ts = ThreadStacks()
workerTimeout = True
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
# We use this period to do house keeping work, when all worker
......@@ -358,12 +383,20 @@ class ThreadCoordinator:
break # do transition only if tasks are error free
# Ending previous step
transitionFailed = self._doTransition() # To start, we end step -1 first
transitionFailed = self._doTransition() # To start, we end step -1 first
except taos.error.ProgrammingError as err:
transitionFailed = True
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
logger.info("Transition failed: errno=0x{:X}, msg: {}".format(errno2, err))
# Then we move on to the next step
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
logger.debug("Abnormal ending of main thraed")
elif workerTimeout:
logger.debug("Abnormal ending of main thread, due to worker timeout")
else: # regular ending, workers waiting at "barrier"
logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
......@@ -561,6 +594,10 @@ class DbConn:
def __init__(self):
self.isOpen = False
self._type = self.TYPE_INVALID
self._lastSql = None
def getLastSql(self):
return self._lastSql
def open(self):
if (self.isOpen):
......@@ -569,9 +606,7 @@ class DbConn:
# below implemented by child classes
"[DB] data connection opened, type = {}".format(
logger.debug("[DB] data connection opened, type = {}".format(self._type))
self.isOpen = True
def resetDb(self): # reset the whole database, etc.
......@@ -594,21 +629,29 @@ class DbConn:
def _queryAny(self, sql): # actual query result as an int
if (not self.isOpen):
raise RuntimeError(
"Cannot query database until connection is open")
raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1:
raise RuntimeError(
"Unexpected result for query: {}, rows = {}".format(
sql, nRows))
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError(
"Unexpected result set for query: {}".format(sql))
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0]
def use(self, dbName):
self.execute("use {}".format(dbName))
def hasDatabases(self):
return self.query("show databases") > 0
def hasTables(self):
return self.query("show tables") > 0
def execute(self, sql):
raise RuntimeError("Unexpected execution, should be overriden")
def query(self, sql) -> int: # return num rows returned
raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
......@@ -643,10 +686,11 @@ class DbConnRest(DbConn):
self.isOpen = False
def _doSql(self, sql):
self._lastSql = sql # remember this, last SQL attempted
r = requests.post(self._url,
data = sql,
auth = HTTPBasicAuth('root', 'taosdata'))
auth = HTTPBasicAuth('root', 'taosdata'))
print("REST API Failure (TODO: more info here)")
......@@ -742,11 +786,16 @@ class MyTDSql:
class DbConnNative(DbConn):
# Class variables
_lock = threading.Lock()
_connInfoDisplayed = False
def __init__(self):
self._type = self.TYPE_NATIVE
self._conn = None
self._cursor = None
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
......@@ -755,31 +804,32 @@ class DbConnNative(DbConn):
projPath = selfPath[:selfPath.find("tests")]
buildPath = None
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
if buildPath == None:
raise RuntimeError("Failed to determine buildPath, selfPath={}".format(self_path))
return buildPath
connInfoDisplayed = False
def openByType(self): # Open connection
cfgPath = self.getBuildPath() + "/test/cfg"
hostAddr = ""
if not self.connInfoDisplayed:
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
self.connInfoDisplayed = True
self._conn = taos.connect(
config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
# Get the connection/cursor ready
with self._lock: # force single threading for opening DB connections
if not self._connInfoDisplayed:
self.__class__._connInfoDisplayed = True # updating CLASS variable
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
self._cursor.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every
# step
# Open connection
self._tdSql = MyTDSql()
......@@ -984,29 +1034,11 @@ class StateDbOnly(AnyState):
if (not self.hasTask(tasks, TaskCreateDb)):
# only if we don't create any more
self.assertAtMostOneSuccess(tasks, TaskDropDb)
self.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
# Nothing to be said about adding data task
# if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
# self.assertAtMostOneSuccess(tasks, DropDbTask)
# self._state = self.STATE_EMPTY
# if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
# # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
# if ( not self.hasTask(tasks, TaskDropSuperTable) ):
# self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
# self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail
# self._state = self.STATE_TABLE_ONLY
# else:
# self._state = self.STATE_HAS_DATA
# What about AddFixedData?
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
# self._state = self.STATE_HAS_DATA
# else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
# # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks,
# self._state = self.STATE_DB_ONLY # no change
# TODO: restore the below, the problem exists, although unlikely in real-world
# if (gSvcMgr!=None) and gSvcMgr.isRestarting():
# if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) :
# self.assertIfExistThenSuccess(tasks, TaskDropDb)
class StateSuperTableOnly(AnyState):
......@@ -1082,7 +1114,7 @@ class StateMechine:
self._curState = self._findCurrentState() # starting state
# transitition target probabilities, indexed with value of STATE_EMPTY,
self._stateWeights = [1, 3, 5, 15]
self._stateWeights = [1, 2, 10, 40]
def getCurrentState(self):
return self._curState
......@@ -1128,33 +1160,22 @@ class StateMechine:
def _findCurrentState(self):
dbc = self._dbConn
ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
if dbc.query("show databases") == 0: # no database?!
# logger.debug("Found EMPTY state")
"[STT] empty database found, between {} and {}".format(
ts, time.time()))
if not dbc.hasDatabases(): # no database?!
logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
return StateEmpty()
# did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own
dbc.execute("use db")
if dbc.query("show tables") == 0: # no tables
# logger.debug("Found DB ONLY state")
"[STT] DB_ONLY found, between {} and {}".format(
ts, time.time()))
if not dbc.hasTables(): # no tables
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly()
if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName())
) == 0: # no regular tables
# logger.debug("Found TABLE_ONLY state")
"[STT] SUPER_TABLE_ONLY found, between {} and {}".format(
ts, time.time()))
sTable = DbManager.getFixedSuperTable()
if sTable.hasRegTables(dbc): # no regular tables
logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateSuperTableOnly()
else: # has actual tables
# logger.debug("Found HAS_DATA state")
"[STT] HAS_DATA found, between {} and {}".format(
ts, time.time()))
logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
return StateHasData()
def transition(self, tasks):
......@@ -1172,7 +1193,8 @@ class StateMechine:
# case of multiple creation and drops
if self._curState.canDropDb():
self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
if gSvcMgr == None: # only if we are running as client-only
self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
# case of drop-create-drop
......@@ -1300,13 +1322,17 @@ class DbManager():
def getFixedSuperTableName(cls):
return "fs_table"
def getFixedSuperTable(cls):
return TdSuperTable(cls.getFixedSuperTableName())
def releaseTable(self, i): # return the table back, so others can use it
def getNextTick(self):
with self._lock: # prevent duplicate tick
if Dice.throw(10) == 0: # 1 in 10 chance
return self._lastTick + datetime.timedelta(0, -100)
if Dice.throw(20) == 0: # 1 in 20 chance
return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds
else: # regular
# add one second to it
self._lastTick += datetime.timedelta(0, 1)
......@@ -1322,7 +1348,9 @@ class DbManager():
def getNextFloat(self):
return 0.9 + self.getNextInt()
ret = 0.9 + self.getNextInt()
# print("Float obtained: {}".format(ret))
return ret
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
......@@ -1340,33 +1368,35 @@ class TaskExecutor():
def __init__(self, size=10):
self._size = size
self._list = []
self._lock = threading.Lock()
def add(self, n: int):
if not self._list: # empty
# now we should insert
nItems = len(self._list)
insPos = 0
for i in range(nItems):
insPos = i
if n <= self._list[i]: # smaller than this item, time to insert
break # found the insertion point
insPos += 1 # insert to the right
if insPos == 0: # except for the 1st item, # TODO: elimiate first item as gating item
return # do nothing
# print("Inserting at postion {}, value: {}".format(insPos, n))
self._list.insert(insPos, n) # insert
newLen = len(self._list)
if newLen <= self._size:
return # do nothing
elif newLen == (self._size + 1):
del self._list[0] # remove the first item
raise RuntimeError("Corrupt Bounded List")
with self._lock:
if not self._list: # empty
# now we should insert
nItems = len(self._list)
insPos = 0
for i in range(nItems):
insPos = i
if n <= self._list[i]: # smaller than this item, time to insert
break # found the insertion point
insPos += 1 # insert to the right
if insPos == 0: # except for the 1st item, # TODO: elimiate first item as gating item
return # do nothing
# print("Inserting at postion {}, value: {}".format(insPos, n))
self._list.insert(insPos, n) # insert
newLen = len(self._list)
if newLen <= self._size:
return # do nothing
elif newLen == (self._size + 1):
del self._list[0] # remove the first item
raise RuntimeError("Corrupt Bounded List")
def __str__(self):
return repr(self._list)
......@@ -1419,7 +1449,6 @@ class Task():
# logger.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats
self._lastSql = "" # last SQL executed/attempted
def isSuccess(self):
return self._err is None
......@@ -1446,6 +1475,39 @@ class Task():
"To be implemeted by child classes, class name: {}".format(
def _isErrAcceptable(self, errno, msg):
if errno in [
# 0x200, # invalid SQL, TODO: re-examine with TD-934
0x360, 0x362,
0x369, # tag already exists
0x36A, 0x36B, 0x36D,
0x380, # "db not selected"
0x386, # DB is being dropped?!
0x510, # vnode not in ready state
1000 # REST catch-all error
return True # These are the ALWAYS-ACCEPTABLE ones
elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
return True # We may get "network unavilable" when restarting service
elif errno == 0x200 : # invalid SQL, we need to div in a bit more
if msg.find("invalid column name") != -1:
return True
elif msg.find("tags number not matched") != -1: # mismatched tags after modification
return True
elif msg.find("duplicated column names") != -1: # also alter table tag issues
return True
elif (gSvcMgr!=None) and gSvcMgr.isRestarting():
logger.info("Ignoring error when service is restarting: errno = {}, msg = {}".format(errno, msg))
return True
return False # Not an acceptable error
def execute(self, wt: WorkerThread):
self._workerThread = wt # type: ignore
......@@ -1456,36 +1518,25 @@ class Task():
"[-] executing task {}...".format(self.__class__.__name__))
self._err = None
self.__class__.__name__) # mark beginning
self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
errno2 = None
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
errno2 = err.errno if (
err.errno > 0) else 0x80000000 + err.errno # correct error scheme
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if (gConfig.continue_on_exception): # user choose to continue
"[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql))
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql()))
self._err = err
elif (errno2 in [
0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D,
0x381, 0x380, 0x383,
0x386, # DB is being dropped?!
0x510, # vnode not in ready state
1000 # REST catch-all error
]): # allowed errors
"[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql))
elif self._isErrAcceptable(errno2, err.__str__()):
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql()))
print("_", end="", flush=True)
self._err = err
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql)
else: # not an acceptable error
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql())
if gConfig.debug:
# raise # so that we see full stack
......@@ -1509,25 +1560,22 @@ class Task():
except BaseException:
"[=] Unexpected exception, SQL: {}".format(
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
self.logDebug("[X] task execution completed, {}, status: {}".format(
self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
# TODO: merge with above.
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess())
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
def execSql(self, sql):
self._lastSql = sql
return self._dbManager.execute(sql)
def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
self._lastSql = sql
return wt.execSql(sql)
def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
self._lastSql = sql
return wt.querySql(sql)
def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
......@@ -1542,6 +1590,7 @@ class ExecutionStats:
self._lock = threading.Lock()
self._firstTaskStartTime = None
self._execStartTime = None
self._errors = {}
self._elapsedTime = 0.0 # total elapsed time
self._accRunTime = 0.0 # accumulated run time
......@@ -1561,13 +1610,18 @@ class ExecutionStats:
def endExec(self):
self._elapsedTime = time.time() - self._execStartTime
def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
def incExecCount(self, klassName, isSuccess, eno=None): # TODO: add a lock here
if klassName not in self._execTimes:
self._execTimes[klassName] = [0, 0]
t = self._execTimes[klassName] # tuple for the data
t[0] += 1 # index 0 has the "total" execution times
if isSuccess:
t[1] += 1 # index 1 has the "success" execution times
if eno != None:
if klassName not in self._errors:
self._errors[klassName] = {}
errors = self._errors[klassName]
errors[eno] = errors[eno]+1 if eno in errors else 1
def beginTaskType(self, klassName):
with self._lock:
......@@ -1597,7 +1651,14 @@ class ExecutionStats:
execTimesAny = 0
for k, n in self._execTimes.items():
execTimesAny += n[0]
logger.info("| {0:<24}: {1}/{2}".format(k, n[1], n[0]))
errStr = None
if k in self._errors:
errors = self._errors[k]
# print("errors = {}".format(errors))
errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
# print("error strings = {}".format(errStrs))
errStr = ", ".join(errStrs)
logger.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
"| Total Tasks Executed (success or not): {} ".format(execTimesAny))
......@@ -1649,7 +1710,7 @@ class StateTransitionTask(Task):
def getRegTableName(cls, i):
return "db.reg_table_{}".format(i)
return "reg_table_{}".format(i)
def execute(self, wt: WorkerThread):
......@@ -1696,15 +1757,94 @@ class TaskCreateSuperTable(StateTransitionTask):
logger.debug("Skipping task, no DB yet")
tblName = self._dbManager.getFixedSuperTableName()
sTable = self._dbManager.getFixedSuperTable()
# wt.execSql("use db") # should always be in place
"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
sTable.create(wt.getDbConn(), {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'})
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that
# automatically
class TdSuperTable:
def __init__(self, stName):
self._stName = stName
def create(self, dbc, cols: dict, tags: dict):
sql = "CREATE TABLE db.{} ({}) TAGS ({})".format(
",".join(['%s %s'%(k,v) for (k,v) in cols.items()]),
",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
def getRegTables(self, dbc: DbConn):
dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
qr = dbc.getQueryResult()
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def hasRegTables(self, dbc: DbConn):
return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0
def ensureTable(self, dbc: DbConn, regTableName: str):
sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already
sql = "CREATE TABLE {} USING {} tags ({})".format(
regTableName, self._stName, self._getTagStrForSql(dbc)
def _getTagStrForSql(self, dbc) :
tags = self._getTags(dbc)
tagStrs = []
for tagName in tags:
tagType = tags[tagName]
if tagType == 'BINARY':
elif tagType == 'FLOAT':
elif tagType == 'INT':
raise RuntimeError("Unexpected tag type: {}".format(tagType))
return ", ".join(tagStrs)
def _getTags(self, dbc) -> dict:
dbc.query("DESCRIBE {}".format(self._stName))
stCols = dbc.getQueryResult()
# print(stCols)
ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
# print("Tags retrieved: {}".format(ret))
return ret
def addTag(self, dbc, tagName, tagType):
if tagName in self._getTags(dbc): # already
# sTable.addTag("extraTag", "int")
sql = "alter table db.{} add tag {} {}".format(self._stName, tagName, tagType)
def dropTag(self, dbc, tagName):
if not tagName in self._getTags(dbc): # don't have this tag
sql = "alter table db.{} drop tag {}".format(self._stName, tagName)
def changeTag(self, dbc, oldTag, newTag):
tags = self._getTags(dbc)
if not oldTag in tags: # don't have this tag
if newTag in tags: # already have this tag
sql = "alter table db.{} change tag {} {}".format(self._stName, oldTag, newTag)
class TaskReadData(StateTransitionTask):
def getEndState(cls):
......@@ -1715,23 +1855,24 @@ class TaskReadData(StateTransitionTask):
return state.canReadData()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTbName = self._dbManager.getFixedSuperTableName()
self.queryWtSql(wt, "select TBNAME from db.{}".format(
sTbName)) # TODO: analyze result set later
sTable = self._dbManager.getFixedSuperTable()
if random.randrange(
5) == 0: # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
# wt.getDbConn().getQueryResult()
rTables = self.getQueryResult(wt)
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
for rTbName in rTables: # regular tables
self.execWtSql(wt, "select * from db.{}".format(rTbName[0]))
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
for rTbName in sTable.getRegTables(wt.getDbConn()): # regular tables
aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)',
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)', 'stddev(speed)',
'min(speed)', 'max(speed)', 'first(speed)', 'last(speed)']) # TODO: add more from 'top'
self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName))
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, wt.getDbConn().getLastSql()))
class TaskDropSuperTable(StateTransitionTask):
......@@ -1789,20 +1930,55 @@ class TaskAlterTags(StateTransitionTask):
return state.canDropFixedSuperTable() # if we can drop it, we can alter tags
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbManager.getFixedSuperTableName()
# tblName = self._dbManager.getFixedSuperTableName()
dbc = wt.getDbConn()
sTable = self._dbManager.getFixedSuperTable()
dice = Dice.throw(4)
if dice == 0:
sql = "alter table db.{} add tag extraTag int".format(tblName)
sTable.addTag(dbc, "extraTag", "int")
# sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1:
sql = "alter table db.{} drop tag extraTag".format(tblName)
sTable.dropTag(dbc, "extraTag")
# sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2:
sql = "alter table db.{} drop tag newTag".format(tblName)
sTable.dropTag(dbc, "newTag")
# sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3
sql = "alter table db.{} change tag extraTag newTag".format(
sTable.changeTag(dbc, "extraTag", "newTag")
# sql = "alter table db.{} change tag extraTag newTag".format(tblName)
self.execWtSql(wt, sql)
class TaskRestartService(StateTransitionTask):
_isRunning = False
_classLock = threading.Lock()
def getEndState(cls):
return None # meaning doesn't affect state
def canBeginFrom(cls, state: AnyState):
if gConfig.auto_start_service:
return state.canDropFixedSuperTable() # Basicallly when we have the super table
return False # don't run this otherwise
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not gConfig.auto_start_service: # only execute when we are in -a mode
print("_a", end="", flush=True)
with self._classLock:
if self._isRunning:
print("Skipping restart task, another running already")
self._isRunning = True
if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
dbc = wt.getDbConn()
dbc.execute("show databases") # simple delay, align timing with other workers
self._isRunning = False
class TaskAddData(StateTransitionTask):
# Track which table is being actively worked on
......@@ -1833,39 +2009,31 @@ class TaskAddData(StateTransitionTask):
return state.canAddData()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbManager
# wt.execSql("use db") # TODO: seems to be an INSERT bug to require
# this
tblSeq = list(
ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
tblSeq = list(range(
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
for i in tblSeq:
if (i in self.activeTable): # wow already active
# logger.info("Concurrent data insertion into table: {}".format(i))
# print("ct({})".format(i), end="", flush=True) # Concurrent
# insertion into table
print("x", end="", flush=True)
print("x", end="", flush=True) # concurrent insertion
self.activeTable.add(i) # marking it active
# No need to shuffle data sequence, unless later we decide to do
# non-increment insertion
regTableName = self.getRegTableName(
i) # "db.reg_table_{}".format(i)
for j in range(
self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
sTable = ds.getFixedSuperTable()
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
sTable.ensureTable(wt.getDbConn(), regTableName) # Ensure the table exists
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
nextInt = ds.getNextInt()
if gConfig.record_ops:
"Ready to write {} to {}\n".format(
nextInt, regTableName))
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format(
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
ds.getNextBinary(), ds.getNextFloat(),
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), nextInt)
self.execWtSql(wt, sql)
# Successfully wrote the data into the DB, let's record it
......@@ -1912,6 +2080,10 @@ class Dice():
raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop)
def choice(cls, cList):
return random.choice(cList)
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
......@@ -1934,14 +2106,16 @@ class MyLoggingAdapter(logging.LoggerAdapter):
class SvcManager:
def __init__(self):
print("Starting TDengine Service Manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
# signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
# signal.signal(signal.SIGINT, self.sigIntHandler)
# signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService()
self.svcMgrThread = None
self._lock = threading.Lock()
self._isRestarting = False
def _doMenu(self):
choice = ""
......@@ -1976,23 +2150,22 @@ class SvcManager:
elif choice == "2":
elif choice == "3":
elif choice == "3": # Restart
raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
print("Sig INT Handler starting...")
print("SvcManager: INT Signal Handler starting...")
if self.inSigHandler:
print("Ignoring repeated SIG_INT...")
self.inSigHandler = True
print("INT signal handler returning...")
print("SvcManager: INT Signal Handler returning...")
self.inSigHandler = False
def sigHandlerResume(self):
......@@ -2005,44 +2178,78 @@ class SvcManager:
self.svcMgrThread = None # no more
def _procIpcAll(self):
while self.svcMgrThread: # for as long as the svc mgr thread is still here
self.svcMgrThread.procIpcBatch() # regular processing,
while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
if self.isRunning():
self.svcMgrThread.procIpcBatch() # regular processing,
elif self.isRetarting():
print("Service restarting...")
time.sleep(0.5) # pause, before next round
"Service Manager Thread (with subprocess) has ended, main thread now exiting...")
def startTaosService(self):
if self.svcMgrThread:
raise RuntimeError(
"Cannot start TAOS service when one may already be running")
self.svcMgrThread = ServiceManagerThread() # create the object
print("TAOS service started, printing out output...")
forceOutput=True) # for printing 10 lines
print("TAOS service started")
with self._lock:
if self.svcMgrThread:
raise RuntimeError("Cannot start TAOS service when one may already be running")
# Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter():
if proc.name() == 'taosd':
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
# print("Process: {}".format(proc.name()))
self.svcMgrThread = ServiceManagerThread() # create the object
print("Attempting to start TAOS service started, printing out output...")
forceOutput=True) # for printing 10 lines
print("TAOS service started")
def stopTaosService(self, outputLines=20):
print("Terminating Service Manager Thread (SMT) execution...")
if not self.svcMgrThread:
raise RuntimeError("Unexpected empty svc mgr thread")
if self.svcMgrThread.isStopped():
self.svcMgrThread.procIpcBatch(outputLines) # one last time
self.svcMgrThread = None
print("----- End of TDengine Service Output -----\n")
print("SMT execution terminated")
print("WARNING: SMT did not terminate as expected")
with self._lock:
if not self.isRunning():
logger.warning("Cannot stop TAOS service, not running")
print("Terminating Service Manager Thread (SMT) execution...")
if self.svcMgrThread.isStopped():
self.svcMgrThread.procIpcBatch(outputLines) # one last time
self.svcMgrThread = None
print("----- End of TDengine Service Output -----\n")
print("SMT execution terminated")
print("WARNING: SMT did not terminate as expected")
def run(self):
self._procIpcAll() # pump/process all the messages
if self.svcMgrThread: # if sig handler hasn't destroyed it by now
self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
if self.isRunning(): # if sig handler hasn't destroyed it by now
self.stopTaosService() # should have started already
def restart(self):
if self._isRestarting:
logger.warning("Cannot restart service when it's already restarting")
self._isRestarting = True
if self.isRunning():
logger.warning("Service not running when restart requested")
self._isRestarting = False
def isRunning(self):
return self.svcMgrThread != None
def isRestarting(self):
return self._isRestarting
class ServiceManagerThread:
......@@ -2094,6 +2301,7 @@ class ServiceManagerThread:
logger.info("[] TDengine service READY to process requests")
return # now we've started
# TODO: handle this better?
self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully")
def stop(self):
......@@ -2196,7 +2404,10 @@ class ServiceManagerThread:
if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1: # found
self._status = MainExec.STATUS_RUNNING
logger.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
logger.info("Service is now FULLY READY")
self._status = MainExec.STATUS_RUNNING
# Trim the queue if necessary: TODO: try this 1 out of 10 times
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
......@@ -2242,6 +2453,21 @@ class TdeSubProcess:
taosdPath = self.getBuildPath() + "/build/bin/taosd"
cfgPath = self.getBuildPath() + "/test/cfg"
# Delete the log files
logPath = self.getBuildPath() + "/test/log"
# ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
# filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ]
# for f in filelist:
# filePath = os.path.join(logPath, f)
# print("Removing log file: {}".format(filePath))
# os.remove(filePath)
if os.path.exists(logPath):
logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
logger.info("Saving old log files to: {}".format(logPathSaved))
os.rename(logPath, logPathSaved)
# os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
svcCmd = [taosdPath, '-c', cfgPath]
# svcCmd = ['vmstat', '1']
if self.subProcess: # already there
......@@ -2275,16 +2501,46 @@ class TdeSubProcess:
print("TDengine service process terminated successfully from SIG_INT")
self.subProcess = None
class ThreadStacks: # stack info for all threads
def __init__(self):
self._allStacks = {}
allFrames = sys._current_frames()
for th in threading.enumerate():
stack = traceback.extract_stack(allFrames[th.ident])
self._allStacks[th.native_id] = stack
def print(self, filteredEndName = None, filterInternal = False):
for thNid, stack in self._allStacks.items(): # for each thread
lastFrame = stack[-1]
if filteredEndName: # we need to filter out stacks that match this name
if lastFrame.name == filteredEndName : # end did not match
if filterInternal:
if lastFrame.name in ['wait', 'invoke_excepthook',
'_wait', # The Barrier exception
'svcOutputReader', # the svcMgr thread
'__init__']: # the thread that extracted the stack
continue # ignore
# Now print
print("\n<----- Thread Info for ID: {}".format(thNid))
for frame in stack:
# print(frame)
print("File {filename}, line {lineno}, in {name}".format(
filename=frame.filename, lineno=frame.lineno, name=frame.name))
print(" {}".format(frame.line))
print("-----> End of Thread Info\n")
class ClientManager:
def __init__(self):
print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
# signal.signal(signal.SIGTERM, self.sigIntHandler)
# signal.signal(signal.SIGINT, self.sigIntHandler)
self._status = MainExec.STATUS_RUNNING
self.tc = None
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
if self._status != MainExec.STATUS_RUNNING:
print("Repeated SIGINT received, forced exit...")
......@@ -2292,9 +2548,50 @@ class ClientManager:
self._status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...")
print("ClientManager: Terminating program...")
def _doMenu(self):
choice = ""
while True:
print("\nInterrupting Client Program, Choose an Action: ")
print("1: Resume")
print("2: Terminate")
print("3: Show Threads")
# Remember to update the if range below
# print("Enter Choice: ", end="", flush=True)
while choice == "":
choice = input("Enter Choice: ")
if choice != "":
break # done with reading repeated input
if choice in ["1", "2", "3"]:
break # we are done with whole method
print("Invalid choice, please try again.")
choice = "" # reset
return choice
def sigUsrHandler(self, signalNumber, frame):
print("Interrupting main thread execution upon SIGUSR1")
if self.inSigHandler: # already
print("Ignoring repeated SIG_USR1...")
return # do nothing if it's already not running
self.inSigHandler = True
choice = self._doMenu()
if choice == "1":
print("Resuming execution...")
elif choice == "2":
print("Not implemented yet")
elif choice == "3":
ts = ThreadStacks()
raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def _printLastNumbers(self): # to verify data durability
dbManager = DbManager(resetDb=False)
dbc = dbManager.getDbConn()
......@@ -2327,21 +2624,17 @@ class ClientManager:
def prepare(self):
def run(self):
if gConfig.auto_start_service:
svcMgr = SvcManager()
def run(self, svcMgr):
dbManager = DbManager() # Regular function
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
# print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed()))
if gConfig.auto_start_service:
if svcMgr: # gConfig.auto_start_service:
# Print exec status, etc., AFTER showing messages from the server
......@@ -2353,25 +2646,58 @@ class ClientManager:
class MainExec:
def runClient(cls):
clientManager = ClientManager()
return clientManager.run()
def __init__(self):
self._clientMgr = None
self._svcMgr = None
def runService(cls):
svcManager = SvcManager()
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
def runTemp(cls): # for debugging purposes
def sigUsrHandler(self, signalNumber, frame):
if self._clientMgr:
self._clientMgr.sigUsrHandler(signalNumber, frame)
elif self._svcMgr: # Only if no client mgr, we are running alone
self._svcMgr.sigUsrHandler(signalNumber, frame)
def sigIntHandler(self, signalNumber, frame):
if self._svcMgr:
self._svcMgr.sigIntHandler(signalNumber, frame)
if self._clientMgr:
self._clientMgr.sigIntHandler(signalNumber, frame)
def runClient(self):
global gSvcMgr
if gConfig.auto_start_service:
self._svcMgr = SvcManager()
gSvcMgr = self._svcMgr # hack alert
self._svcMgr.startTaosService() # we start, don't run
self._clientMgr = ClientManager()
ret = None
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
except requests.exceptions.ConnectionError as err:
logger.warning("Failed to open REST connection to DB")
# don't raise
return ret
def runService(self):
global gSvcMgr
self._svcMgr = SvcManager()
gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert
self._svcMgr.run() # run to some end state
self._svcMgr = None
gSvcMgr = None
def runTemp(self): # for debugging purposes
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
# dbc = dbState.getDbConn()
# sTbName = dbState.getFixedSuperTableName()
......@@ -2527,10 +2853,11 @@ def main():
Dice.seed(0) # initial seeding of dice
# Run server or client
mExec = MainExec()
if gConfig.run_tdengine: # run server
return MainExec.runClient()
return mExec.runClient()
if __name__ == "__main__":
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册