提交 ea0af95d 编写于 作者: S Steven Li

Enhanced crash_gen tool to test against multiple databases concurrently,...

Enhanced crash_gen tool to test against multiple databases concurrently, getting ready to test against clusters
上级 2a7b9134
...@@ -53,14 +53,13 @@ except: ...@@ -53,14 +53,13 @@ except:
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
raise Exception("Must be using Python 3") raise Exception("Must be using Python 3")
# Global variables, tried to keep a small number. # Global variables, tried to keep a small number.
# Command-line/Environment Configurations, will set a bit later # Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace # ConfigNameSpace = argparse.Namespace
gConfig = argparse.Namespace() # Dummy value, will be replaced later gConfig = argparse.Namespace() # Dummy value, will be replaced later
gSvcMgr = None # TODO: refactor this hack, use dep injection gSvcMgr = None # TODO: refactor this hack, use dep injection
logger = None logger = None # type: Logger
def runThread(wt: WorkerThread): def runThread(wt: WorkerThread):
wt.run() wt.run()
...@@ -101,7 +100,7 @@ class WorkerThread: ...@@ -101,7 +100,7 @@ class WorkerThread:
else: else:
raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
self._dbInUse = False # if "use db" was executed already # self._dbInUse = False # if "use db" was executed already
def logDebug(self, msg): def logDebug(self, msg):
logger.debug(" TRD[{}] {}".format(self._tid, msg)) logger.debug(" TRD[{}] {}".format(self._tid, msg))
...@@ -109,13 +108,13 @@ class WorkerThread: ...@@ -109,13 +108,13 @@ class WorkerThread:
def logInfo(self, msg): def logInfo(self, msg):
logger.info(" TRD[{}] {}".format(self._tid, msg)) logger.info(" TRD[{}] {}".format(self._tid, msg))
def dbInUse(self): # def dbInUse(self):
return self._dbInUse # return self._dbInUse
def useDb(self): # def useDb(self):
if (not self._dbInUse): # if (not self._dbInUse):
self.execSql("use db") # self.execSql("use db")
self._dbInUse = True # self._dbInUse = True
def getTaskExecutor(self): def getTaskExecutor(self):
return self._tc.getTaskExecutor() return self._tc.getTaskExecutor()
...@@ -161,12 +160,12 @@ class WorkerThread: ...@@ -161,12 +160,12 @@ class WorkerThread:
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
# Before we fetch the task and run it, let's ensure we properly "use" the database # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
try: try:
if (gConfig.per_thread_db_connection): # most likely TRUE if (gConfig.per_thread_db_connection): # most likely TRUE
if not self._dbConn.isOpen: # might have been closed during server auto-restart if not self._dbConn.isOpen: # might have been closed during server auto-restart
self._dbConn.open() self._dbConn.open()
self.useDb() # might encounter exceptions. TODO: catch # self.useDb() # might encounter exceptions. TODO: catch
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready
...@@ -181,14 +180,13 @@ class WorkerThread: ...@@ -181,14 +180,13 @@ class WorkerThread:
task = tc.fetchTask() task = tc.fetchTask()
# Execute such a task # Execute such a task
logger.debug( logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
"[TRD] Worker thread [{}] about to execute task: {}".format(
self._tid, task.__class__.__name__)) self._tid, task.__class__.__name__))
task.execute(self) task.execute(self)
tc.saveExecutedTask(task) tc.saveExecutedTask(task)
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
self._dbInUse = False # there may be changes between steps # self._dbInUse = False # there may be changes between steps
# print("_wtd", end=None) # worker thread died # print("_wtd", end=None) # worker thread died
def verifyThreadSelf(self): # ensure we are called by this own thread def verifyThreadSelf(self): # ensure we are called by this own thread
...@@ -237,7 +235,7 @@ class WorkerThread: ...@@ -237,7 +235,7 @@ class WorkerThread:
def getQueryResult(self): def getQueryResult(self):
return self.getDbConn().getQueryResult() return self.getDbConn().getQueryResult()
def getDbConn(self): def getDbConn(self) -> DbConn :
if (gConfig.per_thread_db_connection): if (gConfig.per_thread_db_connection):
return self._dbConn return self._dbConn
else: else:
...@@ -255,7 +253,7 @@ class WorkerThread: ...@@ -255,7 +253,7 @@ class WorkerThread:
class ThreadCoordinator: class ThreadCoordinator:
WORKER_THREAD_TIMEOUT = 60 # one minute WORKER_THREAD_TIMEOUT = 60 # one minute
def __init__(self, pool: ThreadPool, dbManager): def __init__(self, pool: ThreadPool, dbManager: DbManager):
self._curStep = -1 # first step is 0 self._curStep = -1 # first step is 0
self._pool = pool self._pool = pool
# self._wd = wd # self._wd = wd
...@@ -268,6 +266,7 @@ class ThreadCoordinator: ...@@ -268,6 +266,7 @@ class ThreadCoordinator:
self._pool.numThreads + 1) # one barrier for all threads self._pool.numThreads + 1) # one barrier for all threads
self._execStats = ExecutionStats() self._execStats = ExecutionStats()
self._runStatus = MainExec.STATUS_RUNNING self._runStatus = MainExec.STATUS_RUNNING
self._initDbs()
def getTaskExecutor(self): def getTaskExecutor(self):
return self._te return self._te
...@@ -332,12 +331,16 @@ class ThreadCoordinator: ...@@ -332,12 +331,16 @@ class ThreadCoordinator:
def _doTransition(self): def _doTransition(self):
transitionFailed = False transitionFailed = False
try: try:
sm = self._dbManager.getStateMachine() for x in self._dbs:
logger.debug("[STT] starting transitions") db = x # type: Database
# at end of step, transiton the DB state sm = db.getStateMachine()
sm.transition(self._executedTasks) logger.debug("[STT] starting transitions for DB: {}".format(db.getName()))
logger.debug("[STT] transition ended") # at end of step, transiton the DB state
# Due to limitation (or maybe not) of the Python library, tasksForDb = db.filterTasks(self._executedTasks)
sm.transition(tasksForDb, self.getDbManager().getDbConn())
logger.debug("[STT] transition ended for DB: {}".format(db.getName()))
# Due to limitation (or maybe not) of the TD Python library,
# we cannot share connections across threads # we cannot share connections across threads
# Here we are in main thread, we cannot operate the connections created in workers # Here we are in main thread, we cannot operate the connections created in workers
# Moving below to task loop # Moving below to task loop
...@@ -347,6 +350,7 @@ class ThreadCoordinator: ...@@ -347,6 +350,7 @@ class ThreadCoordinator:
# t.useDb() # t.useDb()
# t.execSql("use db") # main thread executing "use # t.execSql("use db") # main thread executing "use
# db" on behalf of every worker thread # db" on behalf of every worker thread
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
if (err.msg == 'network unavailable'): # broken DB connection if (err.msg == 'network unavailable'): # broken DB connection
logger.info("DB connection broken, execution failed") logger.info("DB connection broken, execution failed")
...@@ -458,23 +462,34 @@ class ThreadCoordinator: ...@@ -458,23 +462,34 @@ class ThreadCoordinator:
def isRunning(self): def isRunning(self):
return self._te is not None return self._te is not None
def _initDbs(self):
''' Initialize multiple databases, invoked at __ini__() time '''
self._dbs = [] # type: List[Database]
dbc = self.getDbManager().getDbConn()
if gConfig.max_dbs == 0:
self._dbs.append(Database(0, dbc))
else:
for i in range(gConfig.max_dbs):
self._dbs.append(Database(i, dbc))
def pickDatabase(self):
idxDb = 0
if gConfig.max_dbs != 0 :
idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1
db = self._dbs[idxDb] # type: Database
return db
def fetchTask(self) -> Task: def fetchTask(self) -> Task:
''' The thread coordinator (that's us) is responsible for fetching a task
to be executed next.
'''
if (not self.isRunning()): # no task if (not self.isRunning()): # no task
raise RuntimeError("Cannot fetch task when not running") raise RuntimeError("Cannot fetch task when not running")
# return self._wd.pickTask()
# Alternatively, let's ask the DbState for the appropriate task
# dbState = self.getDbState()
# tasks = dbState.getTasksAtState() # TODO: create every time?
# nTasks = len(tasks)
# i = Dice.throw(nTasks)
# logger.debug(" (dice:{}/{}) ".format(i, nTasks))
# # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
# return tasks[i].clone() # TODO: still necessary?
# pick a task type for current state # pick a task type for current state
taskType = self.getDbManager().getStateMachine().pickTaskType() db = self.pickDatabase()
return taskType( taskType = db.getStateMachine().pickTaskType() # type: Task
self.getDbManager(), return taskType(self._execStats, db) # create a task from it
self._execStats) # create a task from it
def resetExecutedTasks(self): def resetExecutedTasks(self):
self._executedTasks = [] # should be under single thread self._executedTasks = [] # should be under single thread
...@@ -632,17 +647,6 @@ class DbConn: ...@@ -632,17 +647,6 @@ class DbConn:
logger.debug("[DB] data connection opened, type = {}".format(self._type)) logger.debug("[DB] data connection opened, type = {}".format(self._type))
self.isOpen = True self.isOpen = True
def resetDb(self): # reset the whole database, etc.
if (not self.isOpen):
raise RuntimeError("Cannot reset database until connection is open")
# self._tdSql.prepare() # Recreate database, etc.
self.execute('drop database if exists db')
logger.debug("Resetting DB, dropped database")
# self._cursor.execute('create database db')
# self._cursor.execute('use db')
# tdSql.execute('show databases')
def queryScalar(self, sql) -> int: def queryScalar(self, sql) -> int:
return self._queryAny(sql) return self._queryAny(sql)
...@@ -662,16 +666,32 @@ class DbConn: ...@@ -662,16 +666,32 @@ class DbConn:
def use(self, dbName): def use(self, dbName):
self.execute("use {}".format(dbName)) self.execute("use {}".format(dbName))
def hasDatabases(self): def existsDatabase(self, dbName: str):
return self.query("show databases") > 1 # We now have a "log" database by default ''' Check if a certain database exists '''
self.query("show databases")
dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
# ret2 = dbName in dbs
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
return dbName in dbs # TODO: super weird type mangling seen, once here
def hasTables(self): def hasTables(self):
return self.query("show tables") > 0 return self.query("show tables") > 0
def execute(self, sql): def execute(self, sql):
''' Return the number of rows affected'''
raise RuntimeError("Unexpected execution, should be overriden") raise RuntimeError("Unexpected execution, should be overriden")
def safeExecute(self, sql):
'''Safely execute any SQL query, returning True/False upon success/failure'''
try:
self.execute(sql)
return True # ignore num of results, return success
except taos.error.ProgrammingError as err:
return False # failed, for whatever TAOS reason
# Not possile to reach here, non-TAOS exception would have been thrown
def query(self, sql) -> int: # return num rows returned def query(self, sql) -> int: # return num rows returned
''' Return the number of rows affected'''
raise RuntimeError("Unexpected execution, should be overriden") raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self): def openByType(self):
...@@ -922,7 +942,9 @@ class AnyState: ...@@ -922,7 +942,9 @@ class AnyState:
STATE_VAL_IDX = 0 STATE_VAL_IDX = 0
CAN_CREATE_DB = 1 CAN_CREATE_DB = 1
CAN_DROP_DB = 2 # For below, if we can "drop the DB", but strictly speaking
# only "under normal circumstances", as we may override it with the -b option
CAN_DROP_DB = 2
CAN_CREATE_FIXED_SUPER_TABLE = 3 CAN_CREATE_FIXED_SUPER_TABLE = 3
CAN_DROP_FIXED_SUPER_TABLE = 4 CAN_DROP_FIXED_SUPER_TABLE = 4
CAN_ADD_DATA = 5 CAN_ADD_DATA = 5
...@@ -935,6 +957,8 @@ class AnyState: ...@@ -935,6 +957,8 @@ class AnyState:
# -1 hack to accomodate the STATE_INVALID case # -1 hack to accomodate the STATE_INVALID case
return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
# Each sub state tells us the "info", about itself, so we can determine
# on things like canDropDB()
def getInfo(self): def getInfo(self):
raise RuntimeError("Must be overriden by child classes") raise RuntimeError("Must be overriden by child classes")
...@@ -961,6 +985,10 @@ class AnyState: ...@@ -961,6 +985,10 @@ class AnyState:
return self._info[self.CAN_CREATE_DB] return self._info[self.CAN_CREATE_DB]
def canDropDb(self): def canDropDb(self):
# If user requests to run up to a number of DBs,
# we'd then not do drop_db operations any more
if gConfig.max_dbs > 0 :
return False
return self._info[self.CAN_DROP_DB] return self._info[self.CAN_DROP_DB]
def canCreateFixedSuperTable(self): def canCreateFixedSuperTable(self):
...@@ -1145,13 +1173,16 @@ class StateHasData(AnyState): ...@@ -1145,13 +1173,16 @@ class StateHasData(AnyState):
class StateMechine: class StateMechine:
def __init__(self, dbConn): def __init__(self, db: Database):
self._dbConn = dbConn self._db = db
self._curState = self._findCurrentState() # starting state # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
# transitition target probabilities, indexed with value of STATE_EMPTY,
# STATE_DB_ONLY, etc.
self._stateWeights = [1, 2, 10, 40] self._stateWeights = [1, 2, 10, 40]
def init(self, dbc: DbConn): # late initailization, don't save the dbConn
self._curState = self._findCurrentState(dbc) # starting state
logger.debug("Found Starting State: {}".format(self._curState))
# TODO: seems no lnoger used, remove?
def getCurrentState(self): def getCurrentState(self):
return self._curState return self._curState
...@@ -1193,34 +1224,35 @@ class StateMechine: ...@@ -1193,34 +1224,35 @@ class StateMechine:
typesToStrings(taskTypes))) typesToStrings(taskTypes)))
return taskTypes return taskTypes
def _findCurrentState(self): def _findCurrentState(self, dbc: DbConn):
dbc = self._dbConn
ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
if not dbc.hasDatabases(): # no database?! dbName =self._db.getName()
if not dbc.existsDatabase(dbName): # dbc.hasDatabases(): # no database?!
logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time())) logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
return StateEmpty() return StateEmpty()
# did not do this when openning connection, and this is NOT the worker # did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own # thread, which does this on their own
dbc.use("db") dbc.use(dbName)
if not dbc.hasTables(): # no tables if not dbc.hasTables(): # no tables
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly() return StateDbOnly()
sTable = DbManager.getFixedSuperTable() sTable = self._db.getFixedSuperTable()
if sTable.hasRegTables(dbc): # no regular tables if sTable.hasRegTables(dbc, dbName): # no regular tables
logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateSuperTableOnly() return StateSuperTableOnly()
else: # has actual tables else: # has actual tables
logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
return StateHasData() return StateHasData()
def transition(self, tasks): # We transition the system to a new state by examining the current state itself
def transition(self, tasks, dbc: DbConn):
if (len(tasks) == 0): # before 1st step, or otherwise empty if (len(tasks) == 0): # before 1st step, or otherwise empty
logger.debug("[STT] Starting State: {}".format(self._curState)) logger.debug("[STT] Starting State: {}".format(self._curState))
return # do nothing return # do nothing
# this should show up in the server log, separating steps # this should show up in the server log, separating steps
self._dbConn.execute("show dnodes") dbc.execute("show dnodes")
# Generic Checks, first based on the start state # Generic Checks, first based on the start state
if self._curState.canCreateDb(): if self._curState.canCreateDb():
...@@ -1251,7 +1283,7 @@ class StateMechine: ...@@ -1251,7 +1283,7 @@ class StateMechine:
# if self._state.canReadData(): # if self._state.canReadData():
# Nothing for sure # Nothing for sure
newState = self._findCurrentState() newState = self._findCurrentState(dbc)
logger.debug("[STT] New DB state determined: {}".format(newState)) logger.debug("[STT] New DB state determined: {}".format(newState))
# can old state move to new state through the tasks? # can old state move to new state through the tasks?
self._curState.verifyTasksToState(tasks, newState) self._curState.verifyTasksToState(tasks, newState)
...@@ -1283,49 +1315,51 @@ class StateMechine: ...@@ -1283,49 +1315,51 @@ class StateMechine:
if rnd < 0: if rnd < 0:
return i return i
# Manager of the Database Data/Connection
class Database:
''' We use this to represent an actual TDengine database inside a service instance,
possibly in a cluster environment.
For now we use it to manage state transitions in that database
'''
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
self._dbNum = dbNum # we assign a number to databases, for our testing purpose
self._stateMachine = StateMechine(self)
self._stateMachine.init(dbc)
class DbManager():
def __init__(self, resetDb=True):
self.tableNumQueue = LinearQueue()
# datetime.datetime(2019, 1, 1) # initial date time tick
self._lastTick = self.setupLastTick() self._lastTick = self.setupLastTick()
self._lastInt = 0 # next one is initial integer self._lastInt = 0 # next one is initial integer
self._lock = threading.RLock() self._lock = threading.RLock()
# self.openDbServerConnection() def getStateMachine(self) -> StateMechine:
self._dbConn = DbConn.createNative() if ( return self._stateMachine
gConfig.connector_type == 'native') else DbConn.createRest()
try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if (err.msg == 'client disconnected'): # cannot open DB connection
print(
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit(2)
else:
print("Failed to connect to DB, errno = {}, msg: {}".format(Helper.convertErrno(err.errno), err.msg))
raise
except BaseException:
print("[=] Unexpected exception")
raise
if resetDb: def getDbNum(self):
self._dbConn.resetDb() # drop and recreate DB return self._dbNum
# Do this after dbConn is in proper shape def getName(self):
self._stateMachine = StateMechine(self._dbConn) return "db_{}".format(self._dbNum)
def getDbConn(self): def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us
return self._dbConn outTasks = []
for task in inTasks:
if task.getDb().isSame(self):
outTasks.append(task)
return outTasks
def getStateMachine(self) -> StateMechine: def isSame(self, other):
return self._stateMachine return self._dbNum == other._dbNum
def exists(self, dbc: DbConn):
return dbc.existsDatabase(self.getName())
# def getState(self): @classmethod
# return self._stateMachine.getCurrentState() def getFixedSuperTableName(cls):
return "fs_table"
@classmethod
def getFixedSuperTable(cls) -> TdSuperTable:
return TdSuperTable(cls.getFixedSuperTableName())
# We aim to create a starting time tick, such that, whenever we run our test here once # We aim to create a starting time tick, such that, whenever we run our test here once
# We should be able to safely create 100,000 records, which will not have any repeated time stamp # We should be able to safely create 100,000 records, which will not have any repeated time stamp
...@@ -1347,25 +1381,6 @@ class DbManager(): ...@@ -1347,25 +1381,6 @@ class DbManager():
logger.info("Setting up TICKS to start from: {}".format(t4)) logger.info("Setting up TICKS to start from: {}".format(t4))
return t4 return t4
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
def addTable(self):
with self._lock:
tIndex = self.tableNumQueue.push()
return tIndex
@classmethod
def getFixedSuperTableName(cls):
return "fs_table"
@classmethod
def getFixedSuperTable(cls):
return TdSuperTable(cls.getFixedSuperTableName())
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
def getNextTick(self): def getNextTick(self):
with self._lock: # prevent duplicate tick with self._lock: # prevent duplicate tick
if Dice.throw(20) == 0: # 1 in 20 chance if Dice.throw(20) == 0: # 1 in 20 chance
...@@ -1389,6 +1404,55 @@ class DbManager(): ...@@ -1389,6 +1404,55 @@ class DbManager():
# print("Float obtained: {}".format(ret)) # print("Float obtained: {}".format(ret))
return ret return ret
class DbManager():
''' This is a wrapper around DbConn(), to make it easier to use.
TODO: rename this to DbConnManager
'''
def __init__(self):
self.tableNumQueue = LinearQueue() # TODO: delete?
# self.openDbServerConnection()
self._dbConn = DbConn.createNative() if (
gConfig.connector_type == 'native') else DbConn.createRest()
try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if (err.msg == 'client disconnected'): # cannot open DB connection
print(
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit(2)
else:
print("Failed to connect to DB, errno = {}, msg: {}"
.format(Helper.convertErrno(err.errno), err.msg))
raise
except BaseException:
print("[=] Unexpected exception")
raise
# Do this after dbConn is in proper shape
# Moved to Database()
# self._stateMachine = StateMechine(self._dbConn)
def getDbConn(self):
return self._dbConn
# TODO: not used any more, to delete
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
# TODO: Not used any more, to delete
def addTable(self):
with self._lock:
tIndex = self.tableNumQueue.push()
return tIndex
# Not used any more, to delete
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
# TODO: not used any more, delete
def getTableNameToDelete(self): def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition! tblNum = self.tableNumQueue.pop() # TODO: race condition!
if (not tblNum): # maybe false if (not tblNum): # maybe false
...@@ -1399,7 +1463,6 @@ class DbManager(): ...@@ -1399,7 +1463,6 @@ class DbManager():
def cleanUp(self): def cleanUp(self):
self._dbConn.close() self._dbConn.close()
class TaskExecutor(): class TaskExecutor():
class BoundedList: class BoundedList:
def __init__(self, size=10): def __init__(self, size=10):
...@@ -1465,6 +1528,10 @@ class TaskExecutor(): ...@@ -1465,6 +1528,10 @@ class TaskExecutor():
class Task(): class Task():
''' A generic "Task" to be executed. For now we decide that there is no
need to embed a DB connection here, we use whatever the Worker Thread has
instead. But a task is always associated with a DB
'''
taskSn = 100 taskSn = 100
@classmethod @classmethod
...@@ -1473,10 +1540,9 @@ class Task(): ...@@ -1473,10 +1540,9 @@ class Task():
# logger.debug("Allocating taskSN: {}".format(Task.taskSn)) # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
return Task.taskSn return Task.taskSn
def __init__(self, dbManager: DbManager, execStats: ExecutionStats): def __init__(self, execStats: ExecutionStats, db: Database):
self._dbManager = dbManager
self._workerThread = None self._workerThread = None
self._err = None self._err = None # type: Exception
self._aborted = False self._aborted = False
self._curStep = None self._curStep = None
self._numRows = None # Number of rows affected self._numRows = None # Number of rows affected
...@@ -1486,6 +1552,7 @@ class Task(): ...@@ -1486,6 +1552,7 @@ class Task():
# logger.debug("Creating new task {}...".format(self._taskNum)) # logger.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats self._execStats = execStats
self._db = db # A task is always associated/for a specific DB
def isSuccess(self): def isSuccess(self):
return self._err is None return self._err is None
...@@ -1494,9 +1561,12 @@ class Task(): ...@@ -1494,9 +1561,12 @@ class Task():
return self._aborted return self._aborted
def clone(self): # TODO: why do we need this again? def clone(self): # TODO: why do we need this again?
newTask = self.__class__(self._dbManager, self._execStats) newTask = self.__class__(self._execStats, self._db)
return newTask return newTask
def getDb(self):
return self._db
def logDebug(self, msg): def logDebug(self, msg):
self._workerThread.logDebug( self._workerThread.logDebug(
"Step[{}.{}] {}".format( "Step[{}.{}] {}".format(
...@@ -1555,9 +1625,12 @@ class Task(): ...@@ -1555,9 +1625,12 @@ class Task():
self.logDebug( self.logDebug(
"[-] executing task {}...".format(self.__class__.__name__)) "[-] executing task {}...".format(self.__class__.__name__))
self._err = None self._err = None # TODO: type hint mess up?
self._execStats.beginTaskType(self.__class__.__name__) # mark beginning self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
errno2 = None errno2 = None
# Now pick a database, and stick with it for the duration of the task execution
dbName = self._db.getName()
try: try:
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
...@@ -1595,7 +1668,7 @@ class Task(): ...@@ -1595,7 +1668,7 @@ class Task():
self._err = e self._err = e
self._aborted = True self._aborted = True
traceback.print_exc() traceback.print_exc()
except BaseException: except BaseException: # TODO: what is this again??!!
self.logDebug( self.logDebug(
"[=] Unexpected exception, SQL: {}".format( "[=] Unexpected exception, SQL: {}".format(
wt.getDbConn().getLastSql())) wt.getDbConn().getLastSql()))
...@@ -1607,10 +1680,9 @@ class Task(): ...@@ -1607,10 +1680,9 @@ class Task():
# TODO: merge with above. # TODO: merge with above.
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2) self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
def execSql(self, sql): # TODO: refactor away, just provide the dbConn
return self._dbManager.execute(sql)
def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
""" Haha """
return wt.execSql(sql) return wt.execSql(sql)
def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
...@@ -1762,9 +1834,10 @@ class TaskCreateDb(StateTransitionTask): ...@@ -1762,9 +1834,10 @@ class TaskCreateDb(StateTransitionTask):
def canBeginFrom(cls, state: AnyState): def canBeginFrom(cls, state: AnyState):
return state.canCreateDb() return state.canCreateDb()
# Actually creating the database(es)
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# self.execWtSql(wt, "create database db replica {}".format(Dice.throw(3)+1)) # was: self.execWtSql(wt, "create database db")
self.execWtSql(wt, "create database db") self.execWtSql(wt, "create database {}".format(self._db.getName()))
class TaskDropDb(StateTransitionTask): class TaskDropDb(StateTransitionTask):
@classmethod @classmethod
...@@ -1776,10 +1849,9 @@ class TaskDropDb(StateTransitionTask): ...@@ -1776,10 +1849,9 @@ class TaskDropDb(StateTransitionTask):
return state.canDropDb() return state.canDropDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
self.execWtSql(wt, "drop database db") self.execWtSql(wt, "drop database {}".format(self._db.getName()))
logger.debug("[OPS] database dropped at {}".format(time.time())) logger.debug("[OPS] database dropped at {}".format(time.time()))
class TaskCreateSuperTable(StateTransitionTask): class TaskCreateSuperTable(StateTransitionTask):
@classmethod @classmethod
def getEndState(cls): def getEndState(cls):
...@@ -1790,13 +1862,14 @@ class TaskCreateSuperTable(StateTransitionTask): ...@@ -1790,13 +1862,14 @@ class TaskCreateSuperTable(StateTransitionTask):
return state.canCreateFixedSuperTable() return state.canCreateFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not wt.dbInUse(): # no DB yet, to the best of our knowledge if not self._db.exists(wt.getDbConn()):
logger.debug("Skipping task, no DB yet") logger.debug("Skipping task, no DB yet")
return return
sTable = self._dbManager.getFixedSuperTable() sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
sTable.create(wt.getDbConn(), {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}) sTable.create(wt.getDbConn(), self._db.getName(),
{'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'})
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that # No need to create the regular tables, INSERT will do that
# automatically # automatically
...@@ -1809,17 +1882,20 @@ class TdSuperTable: ...@@ -1809,17 +1882,20 @@ class TdSuperTable:
def getName(self): def getName(self):
return self._stName return self._stName
def create(self, dbc, cols: dict, tags: dict): # TODO: odd semantic, create() method is usually static?
sql = "CREATE TABLE db.{} ({}) TAGS ({})".format( def create(self, dbc, dbName, cols: dict, tags: dict):
'''Creating a super table'''
sql = "CREATE TABLE {}.{} ({}) TAGS ({})".format(
dbName,
self._stName, self._stName,
",".join(['%s %s'%(k,v) for (k,v) in cols.items()]), ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]),
",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
) )
dbc.execute(sql) dbc.execute(sql)
def getRegTables(self, dbc: DbConn): def getRegTables(self, dbc: DbConn, dbName: str):
try: try:
dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
...@@ -1828,20 +1904,20 @@ class TdSuperTable: ...@@ -1828,20 +1904,20 @@ class TdSuperTable:
qr = dbc.getQueryResult() qr = dbc.getQueryResult()
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def hasRegTables(self, dbc: DbConn): def hasRegTables(self, dbc: DbConn, dbName: str):
return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0 return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0
def ensureTable(self, dbc: DbConn, regTableName: str): def ensureTable(self, dbc: DbConn, dbName: str, regTableName: str):
sql = "select tbname from db.{} where tbname in ('{}')".format(self._stName, regTableName) sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already if dbc.query(sql) >= 1 : # reg table exists already
return return
sql = "CREATE TABLE {} USING {} tags ({})".format( sql = "CREATE TABLE {}.{} USING {}.{} tags ({})".format(
regTableName, self._stName, self._getTagStrForSql(dbc) dbName, regTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
) )
dbc.execute(sql) dbc.execute(sql)
def _getTagStrForSql(self, dbc) : def _getTagStrForSql(self, dbc, dbName: str) :
tags = self._getTags(dbc) tags = self._getTags(dbc, dbName)
tagStrs = [] tagStrs = []
for tagName in tags: for tagName in tags:
tagType = tags[tagName] tagType = tags[tagName]
...@@ -1855,34 +1931,34 @@ class TdSuperTable: ...@@ -1855,34 +1931,34 @@ class TdSuperTable:
raise RuntimeError("Unexpected tag type: {}".format(tagType)) raise RuntimeError("Unexpected tag type: {}".format(tagType))
return ", ".join(tagStrs) return ", ".join(tagStrs)
def _getTags(self, dbc) -> dict: def _getTags(self, dbc, dbName) -> dict:
dbc.query("DESCRIBE {}".format(self._stName)) dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
stCols = dbc.getQueryResult() stCols = dbc.getQueryResult()
# print(stCols) # print(stCols)
ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
# print("Tags retrieved: {}".format(ret)) # print("Tags retrieved: {}".format(ret))
return ret return ret
def addTag(self, dbc, tagName, tagType): def addTag(self, dbc, dbName, tagName, tagType):
if tagName in self._getTags(dbc): # already if tagName in self._getTags(dbc, dbName): # already
return return
# sTable.addTag("extraTag", "int") # sTable.addTag("extraTag", "int")
sql = "alter table db.{} add tag {} {}".format(self._stName, tagName, tagType) sql = "alter table {}.{} add tag {} {}".format(dbName, self._stName, tagName, tagType)
dbc.execute(sql) dbc.execute(sql)
def dropTag(self, dbc, tagName): def dropTag(self, dbc, dbName, tagName):
if not tagName in self._getTags(dbc): # don't have this tag if not tagName in self._getTags(dbc, dbName): # don't have this tag
return return
sql = "alter table db.{} drop tag {}".format(self._stName, tagName) sql = "alter table {}.{} drop tag {}".format(dbName, self._stName, tagName)
dbc.execute(sql) dbc.execute(sql)
def changeTag(self, dbc, oldTag, newTag): def changeTag(self, dbc, dbName, oldTag, newTag):
tags = self._getTags(dbc) tags = self._getTags(dbc, dbName)
if not oldTag in tags: # don't have this tag if not oldTag in tags: # don't have this tag
return return
if newTag in tags: # already have this tag if newTag in tags: # already have this tag
return return
sql = "alter table db.{} change tag {} {}".format(self._stName, oldTag, newTag) sql = "alter table {}.{} change tag {} {}".format(dbName, self._stName, oldTag, newTag)
dbc.execute(sql) dbc.execute(sql)
class TaskReadData(StateTransitionTask): class TaskReadData(StateTransitionTask):
...@@ -1895,19 +1971,21 @@ class TaskReadData(StateTransitionTask): ...@@ -1895,19 +1971,21 @@ class TaskReadData(StateTransitionTask):
return state.canReadData() return state.canReadData()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTable = self._dbManager.getFixedSuperTable() sTable = self._db.getFixedSuperTable()
if random.randrange( # 1 in 5 chance, simulate a broken connection.
5) == 0: # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations if random.randrange(5) == 0: # TODO: break connection in all situations
wt.getDbConn().close() wt.getDbConn().close()
wt.getDbConn().open() wt.getDbConn().open()
print("_r", end="", flush=True)
dbc = wt.getDbConn() dbc = wt.getDbConn()
for rTbName in sTable.getRegTables(dbc): # regular tables dbName = self._db.getName()
for rTbName in sTable.getRegTables(dbc, dbName): # regular tables
aggExpr = Dice.choice([ aggExpr = Dice.choice([
'*', '*',
'count(*)', 'count(*)',
'avg(speed)', 'avg(speed)',
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)', 'sum(speed)',
'stddev(speed)', 'stddev(speed)',
...@@ -1929,10 +2007,10 @@ class TaskReadData(StateTransitionTask): ...@@ -1929,10 +2007,10 @@ class TaskReadData(StateTransitionTask):
]) ])
try: try:
# Run the query against the regular table first # Run the query against the regular table first
dbc.execute("select {} from db.{}".format(aggExpr, rTbName)) dbc.execute("select {} from {}.{}".format(aggExpr, dbName, rTbName))
# Then run it against the super table # Then run it against the super table
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
dbc.execute("select {} from db.{}".format(aggExpr, sTable.getName())) dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
...@@ -1948,27 +2026,25 @@ class TaskDropSuperTable(StateTransitionTask): ...@@ -1948,27 +2026,25 @@ class TaskDropSuperTable(StateTransitionTask):
return state.canDropFixedSuperTable() return state.canDropFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# 1/2 chance, we'll drop the regular tables one by one, in a randomized # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
# sequence
if Dice.throw(2) == 0: if Dice.throw(2) == 0:
# print("_7_", end="", flush=True)
tblSeq = list(range( tblSeq = list(range(
2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) 2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
random.shuffle(tblSeq) random.shuffle(tblSeq)
tickOutput = False # if we have spitted out a "d" character for "drop regular table" tickOutput = False # if we have spitted out a "d" character for "drop regular table"
isSuccess = True isSuccess = True
for i in tblSeq: for i in tblSeq:
regTableName = self.getRegTableName( regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
i) # "db.reg_table_{}".format(i)
try: try:
self.execWtSql(wt, "drop table {}".format( self.execWtSql(wt, "drop table {}.{}".
regTableName)) # nRows always 0, like MySQL format(self._db.getName(), regTableName)) # nRows always 0, like MySQL
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
# correcting for strange error number scheme # correcting for strange error number scheme
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
if (errno2 in [0x362]): # mnode invalid table name if (errno2 in [0x362]): # mnode invalid table name
isSuccess = False isSuccess = False
logger.debug( logger.debug("[DB] Acceptable error when dropping a table")
"[DB] Acceptable error when dropping a table")
continue # try to delete next regular table continue # try to delete next regular table
if (not tickOutput): if (not tickOutput):
...@@ -1979,8 +2055,8 @@ class TaskDropSuperTable(StateTransitionTask): ...@@ -1979,8 +2055,8 @@ class TaskDropSuperTable(StateTransitionTask):
print("f", end="", flush=True) print("f", end="", flush=True)
# Drop the super table itself # Drop the super table itself
tblName = self._dbManager.getFixedSuperTableName() tblName = self._db.getFixedSuperTableName()
self.execWtSql(wt, "drop table db.{}".format(tblName)) self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
class TaskAlterTags(StateTransitionTask): class TaskAlterTags(StateTransitionTask):
...@@ -1995,19 +2071,20 @@ class TaskAlterTags(StateTransitionTask): ...@@ -1995,19 +2071,20 @@ class TaskAlterTags(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# tblName = self._dbManager.getFixedSuperTableName() # tblName = self._dbManager.getFixedSuperTableName()
dbc = wt.getDbConn() dbc = wt.getDbConn()
sTable = self._dbManager.getFixedSuperTable() sTable = self._db.getFixedSuperTable()
dbName = self._db.getName()
dice = Dice.throw(4) dice = Dice.throw(4)
if dice == 0: if dice == 0:
sTable.addTag(dbc, "extraTag", "int") sTable.addTag(dbc, dbName, "extraTag", "int")
# sql = "alter table db.{} add tag extraTag int".format(tblName) # sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1: elif dice == 1:
sTable.dropTag(dbc, "extraTag") sTable.dropTag(dbc, dbName, "extraTag")
# sql = "alter table db.{} drop tag extraTag".format(tblName) # sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2: elif dice == 2:
sTable.dropTag(dbc, "newTag") sTable.dropTag(dbc, dbName, "newTag")
# sql = "alter table db.{} drop tag newTag".format(tblName) # sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3 else: # dice == 3
sTable.changeTag(dbc, "extraTag", "newTag") sTable.changeTag(dbc, dbName, "extraTag", "newTag")
# sql = "alter table db.{} change tag extraTag newTag".format(tblName) # sql = "alter table db.{} change tag extraTag newTag".format(tblName)
class TaskRestartService(StateTransitionTask): class TaskRestartService(StateTransitionTask):
...@@ -2072,7 +2149,8 @@ class TaskAddData(StateTransitionTask): ...@@ -2072,7 +2149,8 @@ class TaskAddData(StateTransitionTask):
return state.canAddData() return state.canAddData()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
db = self._db
tblSeq = list(range( tblSeq = list(range(
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
random.shuffle(tblSeq) random.shuffle(tblSeq)
...@@ -2082,22 +2160,23 @@ class TaskAddData(StateTransitionTask): ...@@ -2082,22 +2160,23 @@ class TaskAddData(StateTransitionTask):
else: else:
self.activeTable.add(i) # marking it active self.activeTable.add(i) # marking it active
sTable = ds.getFixedSuperTable() sTable = db.getFixedSuperTable()
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
sTable.ensureTable(wt.getDbConn(), regTableName) # Ensure the table exists sTable.ensureTable(wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
nextInt = ds.getNextInt() nextInt = db.getNextInt()
if gConfig.record_ops: if gConfig.record_ops:
self.prepToRecordOps() self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.flush() self.fAddLogReady.flush()
os.fsync(self.fAddLogReady) os.fsync(self.fAddLogReady)
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {}) sql = "insert into {}.{} values ('{}', {});".format( # removed: tags ('{}', {})
db.getName(),
regTableName, regTableName,
# ds.getFixedSuperTableName(), # ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(), # ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), nextInt) db.getNextTick(), nextInt)
self.execWtSql(wt, sql) self.execWtSql(wt, sql)
# Successfully wrote the data into the DB, let's record it # Successfully wrote the data into the DB, let's record it
# somehow # somehow
...@@ -2688,40 +2767,38 @@ class ClientManager: ...@@ -2688,40 +2767,38 @@ class ClientManager:
self.inSigHandler = False self.inSigHandler = False
def _printLastNumbers(self): # to verify data durability # TODO: need to revise how we verify data durability
dbManager = DbManager(resetDb=False) # def _printLastNumbers(self): # to verify data durability
dbc = dbManager.getDbConn() # dbManager = DbManager()
if dbc.query("show databases") <= 1: # no database (we have a default called "log") # dbc = dbManager.getDbConn()
return # if dbc.query("show databases") <= 1: # no database (we have a default called "log")
dbc.execute("use db") # return
if dbc.query("show tables") == 0: # no tables # dbc.execute("use db")
return # if dbc.query("show tables") == 0: # no tables
# return
sTbName = dbManager.getFixedSuperTableName()
# sTbName = dbManager.getFixedSuperTableName()
# get all regular tables
# TODO: analyze result set later # # get all regular tables
dbc.query("select TBNAME from db.{}".format(sTbName)) # # TODO: analyze result set later
rTables = dbc.getQueryResult() # dbc.query("select TBNAME from db.{}".format(sTbName))
# rTables = dbc.getQueryResult()
bList = TaskExecutor.BoundedList()
for rTbName in rTables: # regular tables # bList = TaskExecutor.BoundedList()
dbc.query("select speed from db.{}".format(rTbName[0])) # for rTbName in rTables: # regular tables
numbers = dbc.getQueryResult() # dbc.query("select speed from db.{}".format(rTbName[0]))
for row in numbers: # numbers = dbc.getQueryResult()
# print("<{}>".format(n), end="", flush=True) # for row in numbers:
bList.add(row[0]) # # print("<{}>".format(n), end="", flush=True)
# bList.add(row[0])
print("Top numbers in DB right now: {}".format(bList))
print("TDengine client execution is about to start in 2 seconds...") # print("Top numbers in DB right now: {}".format(bList))
time.sleep(2.0) # print("TDengine client execution is about to start in 2 seconds...")
dbManager = None # release? # time.sleep(2.0)
# dbManager = None # release?
def prepare(self):
self._printLastNumbers()
def run(self, svcMgr): def run(self, svcMgr):
self._printLastNumbers() # self._printLastNumbers()
dbManager = DbManager() # Regular function dbManager = DbManager() # Regular function
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
...@@ -2876,6 +2953,13 @@ def main(): ...@@ -2876,6 +2953,13 @@ def main():
'--auto-start-service', '--auto-start-service',
action='store_true', action='store_true',
help='Automatically start/stop the TDengine service (default: false)') help='Automatically start/stop the TDengine service (default: false)')
parser.add_argument(
'-b',
'--max-dbs',
action='store',
default=0,
type=int,
help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
parser.add_argument( parser.add_argument(
'-c', '-c',
'--connector-type', '--connector-type',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册