diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen.py index 48196ab383c974b5c5d3f5ebc54773cd846353e6..b1d79f54c30d86eb92e65c0732add8b9ad98f636 100755 --- a/tests/pytest/crash_gen/crash_gen.py +++ b/tests/pytest/crash_gen/crash_gen.py @@ -44,6 +44,7 @@ import traceback import resource from guppy import hpy import gc +import subprocess try: import psutil @@ -59,12 +60,13 @@ if sys.version_info[0] < 3: # Command-line/Environment Configurations, will set a bit later # ConfigNameSpace = argparse.Namespace -gConfig = argparse.Namespace() # Dummy value, will be replaced later -gSvcMgr = None # TODO: refactor this hack, use dep injection -logger = None # type: Logger +gConfig: argparse.Namespace +gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection +logger: logging.Logger +gContainer: Container -def runThread(wt: WorkerThread): - wt.run() +# def runThread(wt: WorkerThread): +# wt.run() class CrashGenError(Exception): def __init__(self, msg=None, errno=None): @@ -74,7 +76,6 @@ class CrashGenError(Exception): def __str__(self): return self.msg - class WorkerThread: def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, # te: TaskExecutor, @@ -84,7 +85,8 @@ class WorkerThread: self._tid = tid self._tc = tc # type: ThreadCoordinator # self.threadIdent = threading.get_ident() - self._thread = threading.Thread(target=runThread, args=(self,)) + # self._thread = threading.Thread(target=runThread, args=(self,)) + self._thread = threading.Thread(target=self.run) self._stepGate = threading.Event() # Let us have a DB connection of our own @@ -253,7 +255,7 @@ class WorkerThread: class ThreadCoordinator: - WORKER_THREAD_TIMEOUT = 60 # one minute + WORKER_THREAD_TIMEOUT = 180 # one minute def __init__(self, pool: ThreadPool, dbManager: DbManager): self._curStep = -1 # first step is 0 @@ -882,20 +884,15 @@ class MyTDSql: raise return self.affectedRows +class TdeInstance(): + """ + A class to capture the *static* information of a TDengine instance, + including the location of the various files/directories, and basica + configuration. + """ -class DbConnNative(DbConn): - # Class variables - _lock = threading.Lock() - _connInfoDisplayed = False - totalConnections = 0 # Not private - - def __init__(self): - super().__init__() - self._type = self.TYPE_NATIVE - self._conn = None - # self._cursor = None - - def getBuildPath(self): + @classmethod + def _getBuildPath(cls): selfPath = os.path.dirname(os.path.realpath(__file__)) if ("community" in selfPath): projPath = selfPath[:selfPath.find("communit")] @@ -914,10 +911,118 @@ class DbConnNative(DbConn): .format(selfPath, projPath)) return buildPath + def __init__(self, subdir='test'): + self._buildDir = self._getBuildPath() + self._subdir = '/' + subdir # TODO: tolerate "/" + + def __repr__(self): + return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir) + def generateCfgFile(self): + # buildPath = self.getBuildPath() + # taosdPath = self._buildPath + "/build/bin/taosd" + + cfgDir = self.getCfgDir() + cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed + if os.path.exists(cfgFile): + if os.path.isfile(cfgFile): + logger.warning("Config file exists already, skip creation: {}".format(cfgFile)) + return # cfg file already exists, nothing to do + else: + raise CrashGenError("Invalid config file: {}".format(cfgFile)) + # Now that the cfg file doesn't exist + if os.path.exists(cfgDir): + if not os.path.isdir(cfgDir): + raise CrashGenError("Invalid config dir: {}".format(cfgDir)) + # else: good path + else: + os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p" + # Now we have a good cfg dir + cfgValues = { + 'runDir': self.getRunDir(), + 'ip': '127.0.0.1', # TODO: change to a network addressable ip + 'port': 6030, + } + cfgTemplate = """ +dataDir {runDir}/data +logDir {runDir}/log + +charset UTF-8 + +firstEp {ip}:{port} +fqdn {ip} +serverPort {port} + +# was all 135 below +dDebugFlag 135 +cDebugFlag 135 +rpcDebugFlag 135 +qDebugFlag 135 +# httpDebugFlag 143 +# asyncLog 0 +# tables 10 +maxtablesPerVnode 10 +rpcMaxTime 101 +# cache 2 +keep 36500 +# walLevel 2 +walLevel 1 +# +# maxConnections 100 +""" + cfgContent = cfgTemplate.format_map(cfgValues) + f = open(cfgFile, "w") + f.write(cfgContent) + f.close() + + def rotateLogs(self): + logPath = self.getLogDir() + # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397 + if os.path.exists(logPath): + logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S') + logger.info("Saving old log files to: {}".format(logPathSaved)) + os.rename(logPath, logPathSaved) + # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms + + + def getExecFile(self): # .../taosd + return self._buildDir + "/build/bin/taosd" + + def getRunDir(self): # TODO: rename to "root dir" ?! + return self._buildDir + self._subdir + + def getCfgDir(self): # path, not file + return self.getRunDir() + "/cfg" + + def getLogDir(self): + return self.getRunDir() + "/log" + + def getHostAddr(self): + return "127.0.0.1" + + def getServiceCommand(self): # to start the instance + return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() + + + +class DbConnNative(DbConn): + # Class variables + _lock = threading.Lock() + _connInfoDisplayed = False + totalConnections = 0 # Not private + + def __init__(self): + super().__init__() + self._type = self.TYPE_NATIVE + self._conn = None + # self._cursor = None + def openByType(self): # Open connection - cfgPath = self.getBuildPath() + "/test/cfg" - hostAddr = "127.0.0.1" + global gContainer + tdeInstance = gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance + # cfgPath = self.getBuildPath() + "/test/cfg" + cfgPath = tdeInstance.getCfgDir() + hostAddr = tdeInstance.getHostAddr() cls = self.__class__ # Get the class, to access class variables with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! @@ -1662,7 +1767,7 @@ class Task(): 0x503, 0x510, # vnode not in ready state 0x14, # db not ready, errno changed - 0x600, + 0x600, # Invalid table ID, why? 1000 # REST catch-all error ]: return True # These are the ALWAYS-ACCEPTABLE ones @@ -1824,7 +1929,7 @@ class ExecutionStats: "FAILED (reason: {})".format( self._failureReason) if self._failed else "SUCCEEDED")) logger.info("| Task Execution Times (success/total):") - execTimesAny = 0 + execTimesAny = 0.001 # avoid div by zero for k, n in self._execTimes.items(): execTimesAny += n[0] errStr = None @@ -2343,7 +2448,9 @@ class MyLoggingAdapter(logging.LoggerAdapter): # return '[%s] %s' % (self.extra['connid'], msg), kwargs -class SvcManager: +class ServiceManager: + PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process + def __init__(self): print("Starting TDengine Service Manager") # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec @@ -2384,10 +2491,8 @@ class SvcManager: self.inSigHandler = True choice = self._doMenu() - if choice == "1": - # TODO: can the sub-process be blocked due to us not reading from - # queue? - self.sigHandlerResume() + if choice == "1": + self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? elif choice == "2": self.stopTaosService() elif choice == "3": # Restart @@ -2398,20 +2503,20 @@ class SvcManager: self.inSigHandler = False def sigIntHandler(self, signalNumber, frame): - print("SvcManager: INT Signal Handler starting...") + print("ServiceManager: INT Signal Handler starting...") if self.inSigHandler: print("Ignoring repeated SIG_INT...") return self.inSigHandler = True self.stopTaosService() - print("SvcManager: INT Signal Handler returning...") + print("ServiceManager: INT Signal Handler returning...") self.inSigHandler = False def sigHandlerResume(self): - print("Resuming TDengine service manager thread (main thread)...\n\n") + print("Resuming TDengine service manager (main thread)...\n\n") - def _checkServiceManagerThread(self): + def _updateThreadStatus(self): if self.svcMgrThread: # valid svc mgr thread if self.svcMgrThread.isStopped(): # done? self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate? @@ -2419,14 +2524,13 @@ class SvcManager: def _procIpcAll(self): while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here - if self.isRunning(): + if self.isRunning(): self.svcMgrThread.procIpcBatch() # regular processing, - self._checkServiceManagerThread() + self._updateThreadStatus() elif self.isRetarting(): print("Service restarting...") - time.sleep(0.5) # pause, before next round - print( - "Service Manager Thread (with subprocess) has ended, main thread now exiting...") + time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round + print("Service Manager Thread (with subprocess) ended, main thread exiting...") def startTaosService(self): with self._lock: @@ -2440,7 +2544,6 @@ class SvcManager: time.sleep(2.0) proc.kill() # print("Process: {}".format(proc.name())) - self.svcMgrThread = ServiceManagerThread() # create the object print("Attempting to start TAOS service started, printing out output...") @@ -2491,10 +2594,17 @@ class SvcManager: return self._isRestarting class ServiceManagerThread: + """ + A class representing a dedicated thread which manages the "sub process" + of the TDengine service, interacting with its STDOUT/ERR. + + It takes a TdeInstance parameter at creation time, or create a default + """ MAX_QUEUE_SIZE = 10000 - def __init__(self): + def __init__(self, tInst : TdeInstance = None): self._tdeSubProcess = None # type: TdeSubProcess + self._tInst = tInst or TdeInstance() # Need an instance self._thread = None self._status = None @@ -2521,7 +2631,7 @@ class ServiceManagerThread: self._status = MainExec.STATUS_STARTING - self._tdeSubProcess = TdeSubProcess() + self._tdeSubProcess = TdeSubProcess(self._tInst) self._tdeSubProcess.start() self._ipcQueue = Queue() @@ -2681,8 +2791,19 @@ class ServiceManagerThread: class TdeSubProcess: - def __init__(self): + """ + A class to to represent the actual sub process that is the run-time + of a TDengine instance. + + It takes a TdeInstance object as its parameter, with the rationale being + "a sub process runs an instance". + """ + + def __init__(self, tInst : TdeInstance): self.subProcess = None + if tInst is None: + raise CrashGenError("Empty instance not allowed in TdeSubProcess") + self._tInst = tInst # Default create at ServiceManagerThread def getStdOut(self): return self.subProcess.stdout @@ -2696,50 +2817,39 @@ class TdeSubProcess: def getPid(self): return self.subProcess.pid - def getBuildPath(self): - selfPath = os.path.dirname(os.path.realpath(__file__)) - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("communit")] - else: - projPath = selfPath[:selfPath.find("tests")] + # Repalced by TdeInstance class + # def getBuildPath(self): + # selfPath = os.path.dirname(os.path.realpath(__file__)) + # if ("community" in selfPath): + # projPath = selfPath[:selfPath.find("communit")] + # else: + # projPath = selfPath[:selfPath.find("tests")] - for root, dirs, files in os.walk(projPath): - if ("taosd" in files): - rootRealPath = os.path.dirname(os.path.realpath(root)) - if ("packaging" not in rootRealPath): - buildPath = root[:len(root) - len("/build/bin")] - break - return buildPath + # for root, dirs, files in os.walk(projPath): + # if ("taosd" in files): + # rootRealPath = os.path.dirname(os.path.realpath(root)) + # if ("packaging" not in rootRealPath): + # buildPath = root[:len(root) - len("/build/bin")] + # break + # return buildPath def start(self): ON_POSIX = 'posix' in sys.builtin_module_names - taosdPath = self.getBuildPath() + "/build/bin/taosd" - cfgPath = self.getBuildPath() + "/test/cfg" - - # Delete the log files - logPath = self.getBuildPath() + "/test/log" - # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397 - # filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ] - # for f in filelist: - # filePath = os.path.join(logPath, f) - # print("Removing log file: {}".format(filePath)) - # os.remove(filePath) - if os.path.exists(logPath): - logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S') - logger.info("Saving old log files to: {}".format(logPathSaved)) - os.rename(logPath, logPathSaved) - # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms - - svcCmd = [taosdPath, '-c', cfgPath] - # svcCmdSingle = "{} -c {}".format(taosdPath, cfgPath) - # svcCmd = ['vmstat', '1'] + # Sanity check if self.subProcess: # already there raise RuntimeError("Corrupt process state") - # print("Starting service: {}".format(svcCmd)) + # global gContainer + # tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance + self._tInst.generateCfgFile() # service side generates config file, client does not + + self._tInst.rotateLogs() + + print("Starting TDengine instance: {}".format(self._tInst)) self.subProcess = subprocess.Popen( - svcCmd, shell=False, + self._tInst.getServiceCommand(), + shell=False, # svcCmdSingle, shell=True, # capture core dump? stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -2898,10 +3008,15 @@ class ClientManager: # self._printLastNumbers() global gConfig + # Prepare Tde Instance + global gContainer + tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance" + dbManager = DbManager() # Regular function thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) self.tc = ThreadCoordinator(thPool, dbManager) + print("Starting client instance to: {}".format(tInst)) self.tc.run() # print("exec stats: {}".format(self.tc.getExecStats())) # print("TC failed = {}".format(self.tc.isFailed())) @@ -2936,9 +3051,6 @@ class ClientManager: # self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections self.tc.printStats() - - - class MainExec: STATUS_STARTING = 1 STATUS_RUNNING = 2 @@ -2968,7 +3080,7 @@ class MainExec: def runClient(self): global gSvcMgr if gConfig.auto_start_service: - self._svcMgr = SvcManager() + self._svcMgr = ServiceManager() gSvcMgr = self._svcMgr # hack alert self._svcMgr.startTaosService() # we start, don't run @@ -2983,55 +3095,13 @@ class MainExec: def runService(self): global gSvcMgr - self._svcMgr = SvcManager() + self._svcMgr = ServiceManager() gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert self._svcMgr.run() # run to some end state self._svcMgr = None gSvcMgr = None - def runTemp(self): # for debugging purposes - # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix - # dbc = dbState.getDbConn() - # sTbName = dbState.getFixedSuperTableName() - # dbc.execute("create database if not exists db") - # if not dbState.getState().equals(StateEmpty()): - # dbc.execute("use db") - - # rTables = None - # try: # the super table may not exist - # sql = "select TBNAME from db.{}".format(sTbName) - # logger.info("Finding out tables in super table: {}".format(sql)) - # dbc.query(sql) # TODO: analyze result set later - # logger.info("Fetching result") - # rTables = dbc.getQueryResult() - # logger.info("Result: {}".format(rTables)) - # except taos.error.ProgrammingError as err: - # logger.info("Initial Super table OPS error: {}".format(err)) - - # # sys.exit() - # if ( not rTables == None): - # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - # try: - # for rTbName in rTables : # regular tables - # ds = dbState - # logger.info("Inserting into table: {}".format(rTbName[0])) - # sql = "insert into db.{} values ('{}', {});".format( - # rTbName[0], - # ds.getNextTick(), ds.getNextInt()) - # dbc.execute(sql) - # for rTbName in rTables : # regular tables - # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure - # logger.info("Initial READING operation is successful") - # except taos.error.ProgrammingError as err: - # logger.info("Initial WRITE/READ error: {}".format(err)) - - # Sandbox testing code - # dbc = dbState.getDbConn() - # while True: - # rows = dbc.query("show databases") - # print("Rows: {}, time={}".format(rows, time.time())) - return def main(): @@ -3045,28 +3115,7 @@ def main(): 1. You build TDengine in the top level ./build directory, as described in offical docs 2. You run the server there before this script: ./build/bin/taosd -c test/cfg - ''')) - - # parser.add_argument('-a', '--auto-start-service', action='store_true', - # help='Automatically start/stop the TDengine service (default: false)') - # parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, - # help='Connector type to use: native, rest, or mixed (default: 10)') - # parser.add_argument('-d', '--debug', action='store_true', - # help='Turn on DEBUG mode for more logging (default: false)') - # parser.add_argument('-e', '--run-tdengine', action='store_true', - # help='Run TDengine service in foreground (default: false)') - # parser.add_argument('-l', '--larger-data', action='store_true', - # help='Write larger amount of data during write operations (default: false)') - # parser.add_argument('-p', '--per-thread-db-connection', action='store_true', - # help='Use a single shared db connection (default: false)') - # parser.add_argument('-r', '--record-ops', action='store_true', - # help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') - # parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, - # help='Maximum number of steps to run (default: 100)') - # parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, - # help='Number of threads to run (default: 10)') - # parser.add_argument('-x', '--continue-on-exception', action='store_true', - # help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)') + ''')) parser.add_argument( '-a', @@ -3171,8 +3220,31 @@ def main(): else: return mExec.runClient() +class Container(): + _propertyList = {'defTdeInstance'} + + def __init__(self): + self._cargo = {} # No cargo at the beginning + + def _verifyValidProperty(self, name): + if not name in self._propertyList: + raise CrashGenError("Invalid container property: {}".format(name)) + + # Called for an attribute, when other mechanisms fail (compare to __getattribute__) + def __getattr__(self, name): + self._verifyValidProperty(name) + return self._cargo[name] # just a simple lookup + + def __setattr__(self, name, value): + if name == '_cargo' : # reserved vars + super().__setattr__(name, value) + return + self._verifyValidProperty(name) + self._cargo[name] = value if __name__ == "__main__": + gContainer = Container() # micky-mouse DI + exitCode = main() # print("Exiting with code: {}".format(exitCode)) - sys.exit(exitCode) + sys.exit(exitCode) \ No newline at end of file