Introduced TdeInstance concept into crash_gen tool, ready to run clusters next

......@@ -44,6 +44,7 @@ import traceback
import resource
from guppy import hpy
import gc
import subprocess
import psutil
......@@ -59,12 +60,13 @@ if sys.version_info[0] < 3:
# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
gConfig = argparse.Namespace() # Dummy value, will be replaced later
gSvcMgr = None # TODO: refactor this hack, use dep injection
logger = None # type: Logger
gConfig: argparse.Namespace
gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection
logger: logging.Logger
gContainer: Container
def runThread(wt: WorkerThread):
# def runThread(wt: WorkerThread):
# wt.run()
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
......@@ -74,7 +76,6 @@ class CrashGenError(Exception):
def __str__(self):
return self.msg
class WorkerThread:
def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
# te: TaskExecutor,
......@@ -84,7 +85,8 @@ class WorkerThread:
self._tid = tid
self._tc = tc # type: ThreadCoordinator
# self.threadIdent = threading.get_ident()
self._thread = threading.Thread(target=runThread, args=(self,))
# self._thread = threading.Thread(target=runThread, args=(self,))
self._thread = threading.Thread(target=self.run)
self._stepGate = threading.Event()
# Let us have a DB connection of our own
......@@ -253,7 +255,7 @@ class WorkerThread:
class ThreadCoordinator:
WORKER_THREAD_TIMEOUT = 60 # one minute
WORKER_THREAD_TIMEOUT = 180 # one minute
def __init__(self, pool: ThreadPool, dbManager: DbManager):
self._curStep = -1 # first step is 0
......@@ -882,20 +884,15 @@ class MyTDSql:
return self.affectedRows
class TdeInstance():
A class to capture the *static* information of a TDengine instance,
including the location of the various files/directories, and basica
class DbConnNative(DbConn):
# Class variables
_lock = threading.Lock()
_connInfoDisplayed = False
totalConnections = 0 # Not private
def __init__(self):
self._type = self.TYPE_NATIVE
self._conn = None
# self._cursor = None
def getBuildPath(self):
def _getBuildPath(cls):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("communit")]
......@@ -914,10 +911,118 @@ class DbConnNative(DbConn):
.format(selfPath, projPath))
return buildPath
def __init__(self, subdir='test'):
self._buildDir = self._getBuildPath()
self._subdir = '/' + subdir # TODO: tolerate "/"
def __repr__(self):
return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir)
def generateCfgFile(self):
# buildPath = self.getBuildPath()
# taosdPath = self._buildPath + "/build/bin/taosd"
cfgDir = self.getCfgDir()
cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed
if os.path.exists(cfgFile):
if os.path.isfile(cfgFile):
logger.warning("Config file exists already, skip creation: {}".format(cfgFile))
return # cfg file already exists, nothing to do
raise CrashGenError("Invalid config file: {}".format(cfgFile))
# Now that the cfg file doesn't exist
if os.path.exists(cfgDir):
if not os.path.isdir(cfgDir):
raise CrashGenError("Invalid config dir: {}".format(cfgDir))
# else: good path
os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
# Now we have a good cfg dir
cfgValues = {
'runDir': self.getRunDir(),
'ip': '', # TODO: change to a network addressable ip
'port': 6030,
cfgTemplate = """
dataDir {runDir}/data
logDir {runDir}/log
charset UTF-8
firstEp {ip}:{port}
fqdn {ip}
serverPort {port}
# was all 135 below
dDebugFlag 135
cDebugFlag 135
rpcDebugFlag 135
qDebugFlag 135
# httpDebugFlag 143
# asyncLog 0
# tables 10
maxtablesPerVnode 10
rpcMaxTime 101
# cache 2
keep 36500
# walLevel 2
walLevel 1
# maxConnections 100
cfgContent = cfgTemplate.format_map(cfgValues)
f = open(cfgFile, "w")
def rotateLogs(self):
logPath = self.getLogDir()
# ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
if os.path.exists(logPath):
logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
logger.info("Saving old log files to: {}".format(logPathSaved))
os.rename(logPath, logPathSaved)
# os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
def getExecFile(self): # .../taosd
return self._buildDir + "/build/bin/taosd"
def getRunDir(self): # TODO: rename to "root dir" ?!
return self._buildDir + self._subdir
def getCfgDir(self): # path, not file
return self.getRunDir() + "/cfg"
def getLogDir(self):
return self.getRunDir() + "/log"
def getHostAddr(self):
return ""
def getServiceCommand(self): # to start the instance
return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
class DbConnNative(DbConn):
# Class variables
_lock = threading.Lock()
_connInfoDisplayed = False
totalConnections = 0 # Not private
def __init__(self):
self._type = self.TYPE_NATIVE
self._conn = None
# self._cursor = None
def openByType(self): # Open connection
cfgPath = self.getBuildPath() + "/test/cfg"
hostAddr = ""
global gContainer
tdeInstance = gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
# cfgPath = self.getBuildPath() + "/test/cfg"
cfgPath = tdeInstance.getCfgDir()
hostAddr = tdeInstance.getHostAddr()
cls = self.__class__ # Get the class, to access class variables
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
......@@ -1662,7 +1767,7 @@ class Task():
0x510, # vnode not in ready state
0x14, # db not ready, errno changed
0x600, # Invalid table ID, why?
1000 # REST catch-all error
return True # These are the ALWAYS-ACCEPTABLE ones
......@@ -1824,7 +1929,7 @@ class ExecutionStats:
"FAILED (reason: {})".format(
self._failureReason) if self._failed else "SUCCEEDED"))
logger.info("| Task Execution Times (success/total):")
execTimesAny = 0
execTimesAny = 0.001 # avoid div by zero
for k, n in self._execTimes.items():
execTimesAny += n[0]
errStr = None
......@@ -2343,7 +2448,9 @@ class MyLoggingAdapter(logging.LoggerAdapter):
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager:
class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
def __init__(self):
print("Starting TDengine Service Manager")
# signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
......@@ -2384,10 +2491,8 @@ class SvcManager:
self.inSigHandler = True
choice = self._doMenu()
if choice == "1":
# TODO: can the sub-process be blocked due to us not reading from
# queue?
if choice == "1":
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
elif choice == "2":
elif choice == "3": # Restart
......@@ -2398,20 +2503,20 @@ class SvcManager:
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
print("SvcManager: INT Signal Handler starting...")
print("ServiceManager: INT Signal Handler starting...")
if self.inSigHandler:
print("Ignoring repeated SIG_INT...")
self.inSigHandler = True
print("SvcManager: INT Signal Handler returning...")
print("ServiceManager: INT Signal Handler returning...")
self.inSigHandler = False
def sigHandlerResume(self):
print("Resuming TDengine service manager thread (main thread)...\n\n")
print("Resuming TDengine service manager (main thread)...\n\n")
def _checkServiceManagerThread(self):
def _updateThreadStatus(self):
if self.svcMgrThread: # valid svc mgr thread
if self.svcMgrThread.isStopped(): # done?
self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
......@@ -2419,14 +2524,13 @@ class SvcManager:
def _procIpcAll(self):
while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
if self.isRunning():
if self.isRunning():
self.svcMgrThread.procIpcBatch() # regular processing,
elif self.isRetarting():
print("Service restarting...")
time.sleep(0.5) # pause, before next round
"Service Manager Thread (with subprocess) has ended, main thread now exiting...")
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
print("Service Manager Thread (with subprocess) ended, main thread exiting...")
def startTaosService(self):
with self._lock:
......@@ -2440,7 +2544,6 @@ class SvcManager:
# print("Process: {}".format(proc.name()))
self.svcMgrThread = ServiceManagerThread() # create the object
print("Attempting to start TAOS service started, printing out output...")
......@@ -2491,10 +2594,17 @@ class SvcManager:
return self._isRestarting
class ServiceManagerThread:
A class representing a dedicated thread which manages the "sub process"
of the TDengine service, interacting with its STDOUT/ERR.
It takes a TdeInstance parameter at creation time, or create a default
def __init__(self):
def __init__(self, tInst : TdeInstance = None):
self._tdeSubProcess = None # type: TdeSubProcess
self._tInst = tInst or TdeInstance() # Need an instance
self._thread = None
self._status = None
......@@ -2521,7 +2631,7 @@ class ServiceManagerThread:
self._status = MainExec.STATUS_STARTING
self._tdeSubProcess = TdeSubProcess()
self._tdeSubProcess = TdeSubProcess(self._tInst)
self._ipcQueue = Queue()
......@@ -2681,8 +2791,19 @@ class ServiceManagerThread:
class TdeSubProcess:
def __init__(self):
A class to to represent the actual sub process that is the run-time
of a TDengine instance.
It takes a TdeInstance object as its parameter, with the rationale being
"a sub process runs an instance".
def __init__(self, tInst : TdeInstance):
self.subProcess = None
if tInst is None:
raise CrashGenError("Empty instance not allowed in TdeSubProcess")
self._tInst = tInst # Default create at ServiceManagerThread
def getStdOut(self):
return self.subProcess.stdout
......@@ -2696,50 +2817,39 @@ class TdeSubProcess:
def getPid(self):
return self.subProcess.pid
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("communit")]
projPath = selfPath[:selfPath.find("tests")]
# Repalced by TdeInstance class
# def getBuildPath(self):
# selfPath = os.path.dirname(os.path.realpath(__file__))
# if ("community" in selfPath):
# projPath = selfPath[:selfPath.find("communit")]
# else:
# projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
return buildPath
# for root, dirs, files in os.walk(projPath):
# if ("taosd" in files):
# rootRealPath = os.path.dirname(os.path.realpath(root))
# if ("packaging" not in rootRealPath):
# buildPath = root[:len(root) - len("/build/bin")]
# break
# return buildPath
def start(self):
ON_POSIX = 'posix' in sys.builtin_module_names
taosdPath = self.getBuildPath() + "/build/bin/taosd"
cfgPath = self.getBuildPath() + "/test/cfg"
# Delete the log files
logPath = self.getBuildPath() + "/test/log"
# ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
# filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ]
# for f in filelist:
# filePath = os.path.join(logPath, f)
# print("Removing log file: {}".format(filePath))
# os.remove(filePath)
if os.path.exists(logPath):
logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
logger.info("Saving old log files to: {}".format(logPathSaved))
os.rename(logPath, logPathSaved)
# os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
svcCmd = [taosdPath, '-c', cfgPath]
# svcCmdSingle = "{} -c {}".format(taosdPath, cfgPath)
# svcCmd = ['vmstat', '1']
# Sanity check
if self.subProcess: # already there
raise RuntimeError("Corrupt process state")
# print("Starting service: {}".format(svcCmd))
# global gContainer
# tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance
self._tInst.generateCfgFile() # service side generates config file, client does not
print("Starting TDengine instance: {}".format(self._tInst))
self.subProcess = subprocess.Popen(
svcCmd, shell=False,
# svcCmdSingle, shell=True, # capture core dump?
......@@ -2898,10 +3008,15 @@ class ClientManager:
# self._printLastNumbers()
global gConfig
# Prepare Tde Instance
global gContainer
tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"
dbManager = DbManager() # Regular function
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
print("Starting client instance to: {}".format(tInst))
# print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed()))
......@@ -2936,9 +3051,6 @@ class ClientManager:
# self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
class MainExec:
......@@ -2968,7 +3080,7 @@ class MainExec:
def runClient(self):
global gSvcMgr
if gConfig.auto_start_service:
self._svcMgr = SvcManager()
self._svcMgr = ServiceManager()
gSvcMgr = self._svcMgr # hack alert
self._svcMgr.startTaosService() # we start, don't run
......@@ -2983,55 +3095,13 @@ class MainExec:
def runService(self):
global gSvcMgr
self._svcMgr = SvcManager()
self._svcMgr = ServiceManager()
gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert
self._svcMgr.run() # run to some end state
self._svcMgr = None
gSvcMgr = None
def runTemp(self): # for debugging purposes
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
# dbc = dbState.getDbConn()
# sTbName = dbState.getFixedSuperTableName()
# dbc.execute("create database if not exists db")
# if not dbState.getState().equals(StateEmpty()):
# dbc.execute("use db")
# rTables = None
# try: # the super table may not exist
# sql = "select TBNAME from db.{}".format(sTbName)
# logger.info("Finding out tables in super table: {}".format(sql))
# dbc.query(sql) # TODO: analyze result set later
# logger.info("Fetching result")
# rTables = dbc.getQueryResult()
# logger.info("Result: {}".format(rTables))
# except taos.error.ProgrammingError as err:
# logger.info("Initial Super table OPS error: {}".format(err))
# # sys.exit()
# if ( not rTables == None):
# # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
# try:
# for rTbName in rTables : # regular tables
# ds = dbState
# logger.info("Inserting into table: {}".format(rTbName[0]))
# sql = "insert into db.{} values ('{}', {});".format(
# rTbName[0],
# ds.getNextTick(), ds.getNextInt())
# dbc.execute(sql)
# for rTbName in rTables : # regular tables
# dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
# logger.info("Initial READING operation is successful")
# except taos.error.ProgrammingError as err:
# logger.info("Initial WRITE/READ error: {}".format(err))
# Sandbox testing code
# dbc = dbState.getDbConn()
# while True:
# rows = dbc.query("show databases")
# print("Rows: {}, time={}".format(rows, time.time()))
def main():
......@@ -3045,28 +3115,7 @@ def main():
1. You build TDengine in the top level ./build directory, as described in offical docs
2. You run the server there before this script: ./build/bin/taosd -c test/cfg
# parser.add_argument('-a', '--auto-start-service', action='store_true',
# help='Automatically start/stop the TDengine service (default: false)')
# parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
# help='Connector type to use: native, rest, or mixed (default: 10)')
# parser.add_argument('-d', '--debug', action='store_true',
# help='Turn on DEBUG mode for more logging (default: false)')
# parser.add_argument('-e', '--run-tdengine', action='store_true',
# help='Run TDengine service in foreground (default: false)')
# parser.add_argument('-l', '--larger-data', action='store_true',
# help='Write larger amount of data during write operations (default: false)')
# parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
# help='Use a single shared db connection (default: false)')
# parser.add_argument('-r', '--record-ops', action='store_true',
# help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
# parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
# help='Maximum number of steps to run (default: 100)')
# parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
# help='Number of threads to run (default: 10)')
# parser.add_argument('-x', '--continue-on-exception', action='store_true',
# help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
......@@ -3171,8 +3220,31 @@ def main():
return mExec.runClient()
class Container():
_propertyList = {'defTdeInstance'}
def __init__(self):
self._cargo = {} # No cargo at the beginning
def _verifyValidProperty(self, name):
if not name in self._propertyList:
raise CrashGenError("Invalid container property: {}".format(name))
# Called for an attribute, when other mechanisms fail (compare to __getattribute__)
def __getattr__(self, name):
return self._cargo[name] # just a simple lookup
def __setattr__(self, name, value):
if name == '_cargo' : # reserved vars
super().__setattr__(name, value)
self._cargo[name] = value
if __name__ == "__main__":
gContainer = Container() # micky-mouse DI
exitCode = main()
# print("Exiting with code: {}".format(exitCode))
\ No newline at end of file
