diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen_main.py similarity index 97% rename from tests/pytest/crash_gen/crash_gen.py rename to tests/pytest/crash_gen/crash_gen_main.py index 9bc701aa8b871a65ed2fc78fca3a3ae8c5883831..fee51b98c0ae98e30fcd2cb6c265f60f46f46e3f 100755 --- a/tests/pytest/crash_gen/crash_gen.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -38,9 +38,9 @@ import resource from guppy import hpy import gc -from .service_manager import ServiceManager, TdeInstance -from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress -from .db import DbConn, MyTDSql, DbConnNative, DbManager +from crash_gen.service_manager import ServiceManager, TdeInstance +from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress +from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager import taos import requests @@ -435,7 +435,7 @@ class ThreadCoordinator: Logging.debug("\r\n\n--> Main thread ready to finish up...") Logging.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish - Logging.info("\nAll worker threads finished") + Logging.info(". . . All worker threads finished") # No CR/LF before self._execStats.endExec() def cleanup(self): # free resources @@ -1072,17 +1072,18 @@ class Database: t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above - Logging.info("Setting up TICKS to start from: {}".format(t4)) + Logging.debug("Setting up TICKS to start from: {}".format(t4)) return t4 @classmethod def getNextTick(cls): with cls._clsLock: # prevent duplicate tick - if cls._lastLaggingTick==0: + if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized # 10k at 1/20 chance, should be enough to avoid overlaps - cls._lastLaggingTick = cls.setupLastTick() + datetime.timedelta(0, -10000) - if cls._lastTick==0: # should be quite a bit into the future - cls._lastTick = cls.setupLastTick() + tick = cls.setupLastTick() + cls._lastTick = tick + cls._lastLaggingTick = tick + datetime.timedelta(0, -10000) + # if : # should be quite a bit into the future if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds @@ -1322,7 +1323,7 @@ class Task(): self._err = err self._aborted = True except Exception as e: - self.logInfo("Non-TAOS exception encountered") + Logging.info("Non-TAOS exception encountered with: {}".format(self.__class__.__name__)) self._err = e self._aborted = True traceback.print_exc() @@ -1566,8 +1567,11 @@ class TaskCreateSuperTable(StateTransitionTask): sTable = self._db.getFixedSuperTable() # type: TdSuperTable # wt.execSql("use db") # should always be in place + sTable.create(wt.getDbConn(), self._db.getName(), - {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}) + {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}, + dropIfExists = True + ) # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that # automatically @@ -1580,14 +1584,41 @@ class TdSuperTable: def getName(self): return self._stName + def drop(self, dbc, dbName, skipCheck = False): + if self.exists(dbc, dbName) : # if myself exists + fullTableName = dbName + '.' + self._stName + dbc.execute("DROP TABLE {}".format(fullTableName)) + else: + if not skipCheck: + raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName)) + + def exists(self, dbc, dbName): + dbc.execute("USE " + dbName) + return dbc.existsSuperTable(self._stName) + # TODO: odd semantic, create() method is usually static? - def create(self, dbc, dbName, cols: dict, tags: dict): + def create(self, dbc, dbName, cols: dict, tags: dict, + dropIfExists = False + ): + '''Creating a super table''' - sql = "CREATE TABLE {}.{} ({}) TAGS ({})".format( - dbName, - self._stName, - ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]), - ",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) + dbc.execute("USE " + dbName) + fullTableName = dbName + '.' + self._stName + if dbc.existsSuperTable(self._stName): + if dropIfExists: + dbc.execute("DROP TABLE {}".format(fullTableName)) + else: # error + raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName)) + + # Now let's create + sql = "CREATE TABLE {} ({})".format( + fullTableName, + ",".join(['%s %s'%(k,v) for (k,v) in cols.items()])) + if tags is None : + sql += " TAGS (dummy int) " + else: + sql += " TAGS ({})".format( + ",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) ) dbc.execute(sql) @@ -1611,17 +1642,19 @@ class TdSuperTable: return # acquire a lock first, so as to be able to *verify*. More details in TD-1471 - fullTableName = dbName + '.' + regTableName - task.lockTable(fullTableName) + fullTableName = dbName + '.' + regTableName + if task is not None: # optional lock + task.lockTable(fullTableName) Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table - print("(" + fullTableName[-3:] + ")", end="", flush=True) + # print("(" + fullTableName[-3:] + ")", end="", flush=True) try: sql = "CREATE TABLE {} USING {}.{} tags ({})".format( fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName) ) dbc.execute(sql) finally: - task.unlockTable(fullTableName) # no matter what + if task is not None: + task.unlockTable(fullTableName) # no matter what def _getTagStrForSql(self, dbc, dbName: str) : tags = self._getTags(dbc, dbName) @@ -1840,7 +1873,7 @@ class TaskRestartService(StateTransitionTask): with self._classLock: if self._isRunning: - print("Skipping restart task, another running already") + Logging.info("Skipping restart task, another running already") return self._isRunning = True @@ -1999,7 +2032,7 @@ class ThreadStacks: # stack info for all threads class ClientManager: def __init__(self): - print("Starting service manager") + Logging.info("Starting service manager") # signal.signal(signal.SIGTERM, self.sigIntHandler) # signal.signal(signal.SIGINT, self.sigIntHandler) @@ -2101,7 +2134,7 @@ class ClientManager: thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) self.tc = ThreadCoordinator(thPool, dbManager) - print("Starting client instance to: {}".format(tInst)) + Logging.info("Starting client instance: {}".format(tInst)) self.tc.run() # print("exec stats: {}".format(self.tc.getExecStats())) # print("TC failed = {}".format(self.tc.isFailed())) diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/db.py index 43c855647c03d1de3e55393eb85c77250a00a602..2a4b362f82c4516195becf78ef9771b7c62c3c41 100644 --- a/tests/pytest/crash_gen/db.py +++ b/tests/pytest/crash_gen/db.py @@ -95,6 +95,11 @@ class DbConn: # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName))) return dbName in dbs # TODO: super weird type mangling seen, once here + def existsSuperTable(self, stName): + self.query("show stables") + sts = [v[0] for v in self.getQueryResult()] + return stName in sts + def hasTables(self): return self.query("show tables") > 0 @@ -240,6 +245,7 @@ class MyTDSql: def _execInternal(self, sql): startTime = time.time() + # Logging.debug("Executing SQL: " + sql) ret = self._cursor.execute(sql) # print("\nSQL success: {}".format(sql)) queryTime = time.time() - startTime diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/misc.py index 4dc053dae3c1cf9ca047ff00f39a358137a9ba65..2d2ce99d95582809a8940a8d777bc166b633747c 100644 --- a/tests/pytest/crash_gen/misc.py +++ b/tests/pytest/crash_gen/misc.py @@ -27,7 +27,7 @@ class LoggingFilter(logging.Filter): class MyLoggingAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): - return "[{}] {}".format(threading.get_ident() % 10000, msg), kwargs + return "[{:04d}] {}".format(threading.get_ident() % 10000, msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs @@ -51,7 +51,7 @@ class Logging: _logger.addHandler(ch) # Logging adapter, to be used as a logger - print("setting logger variable") + # print("setting logger variable") # global logger cls.logger = MyLoggingAdapter(_logger, []) @@ -166,7 +166,8 @@ class Progress: SERVICE_RECONNECT_START = 4 SERVICE_RECONNECT_SUCCESS = 5 SERVICE_RECONNECT_FAILURE = 6 - CREATE_TABLE_ATTEMPT = 7 + SERVICE_START_NAP = 7 + CREATE_TABLE_ATTEMPT = 8 tokens = { STEP_BOUNDARY: '.', @@ -176,6 +177,7 @@ class Progress: SERVICE_RECONNECT_START: '', SERVICE_RECONNECT_FAILURE: '.xr>', + SERVICE_START_NAP: '_zz', CREATE_TABLE_ATTEMPT: '_c', } diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 196e9d944ab07fa9aa4dce8d73b3dc0ccb4cd168..d249abc4396462b6dbacbfcbb1f6619e48161c3b 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -47,6 +47,17 @@ class TdeInstance(): .format(selfPath, projPath)) return buildPath + @classmethod + def prepareGcovEnv(cls, env): + # Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html + bPath = cls._getBuildPath() # build PATH + numSegments = len(bPath.split('/')) - 1 # "/x/TDengine/build" should yield 3 + numSegments = numSegments - 1 # DEBUG only + env['GCOV_PREFIX'] = bPath + '/svc_gcov' + env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings + Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format( + numSegments, env['GCOV_PREFIX'] )) + def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030): self._buildDir = self._getBuildPath() self._subdir = '/' + subdir # TODO: tolerate "/" @@ -217,6 +228,11 @@ class TdeSubProcess: # raise CrashGenError("Empty instance not allowed in TdeSubProcess") # self._tInst = tInst # Default create at ServiceManagerThread + def __repr__(self): + if self.subProcess is None: + return '[TdeSubProc: Empty]' + return '[TdeSubProc: pid = {}]'.format(self.getPid()) + def getStdOut(self): return self.subProcess.stdout @@ -235,17 +251,30 @@ class TdeSubProcess: # Sanity check if self.subProcess: # already there raise RuntimeError("Corrupt process state") - + + # Prepare environment variables for coverage information + # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment + myEnv = os.environ.copy() + TdeInstance.prepareGcovEnv(myEnv) + + # print(myEnv) + # print(myEnv.items()) + # print("Starting TDengine via Shell: {}".format(cmdLineStr)) + + useShell = True self.subProcess = subprocess.Popen( - cmdLine, - shell=False, + ' '.join(cmdLine) if useShell else cmdLine, + shell=useShell, # svcCmdSingle, shell=True, # capture core dump? stdout=subprocess.PIPE, stderr=subprocess.PIPE, # bufsize=1, # not supported in binary mode - close_fds=ON_POSIX + close_fds=ON_POSIX, + env=myEnv ) # had text=True, which interferred with reading EOF + STOP_SIGNAL = signal.SIGKILL # What signal to use (in kill) to stop a taosd process? + def stop(self): """ Stop a sub process, and try to return a meaningful return code. @@ -267,7 +296,7 @@ class TdeSubProcess: SIGUSR2 12 """ if not self.subProcess: - print("Sub process already stopped") + Logging.error("Sub process already stopped") return # -1 retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N) @@ -278,20 +307,25 @@ class TdeSubProcess: return retCode # process still alive, let's interrupt it - print("Terminate running process, send SIG_INT and wait...") - # sub process should end, then IPC queue should end, causing IO thread to end - # sig = signal.SIGINT - sig = signal.SIGKILL - self.subProcess.send_signal(sig) # SIGNINT or SIGKILL + Logging.info("Terminate running process, send SIG_{} and wait...".format(self.STOP_SIGNAL)) + # sub process should end, then IPC queue should end, causing IO thread to end + topSubProc = psutil.Process(self.subProcess.pid) + for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False + child.send_signal(self.STOP_SIGNAL) + time.sleep(0.2) # 200 ms + # topSubProc.send_signal(sig) # now kill the main sub process (likely the Shell) + + self.subProcess.send_signal(self.STOP_SIGNAL) # main sub process (likely the Shell) self.subProcess.wait(20) retCode = self.subProcess.returncode # should always be there # May throw subprocess.TimeoutExpired exception above, therefore # The process is guranteed to have ended by now self.subProcess = None if retCode != 0: # != (- signal.SIGINT): - Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(sig, retCode)) + Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format( + self.STOP_SIGNAL, retCode)) else: - Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(sig)) + Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(self.STOP_SIGNAL)) return - retCode class ServiceManager: @@ -439,7 +473,7 @@ class ServiceManager: time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round # raise CrashGenError("dummy") - print("Service Manager Thread (with subprocess) ended, main thread exiting...") + Logging.info("Service Manager Thread (with subprocess) ended, main thread exiting...") def _getFirstInstance(self): return self._tInsts[0] @@ -452,7 +486,7 @@ class ServiceManager: # Find if there's already a taosd service, and then kill it for proc in psutil.process_iter(): if proc.name() == 'taosd': - print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt") + Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt") time.sleep(2.0) proc.kill() # print("Process: {}".format(proc.name())) @@ -559,7 +593,8 @@ class ServiceManagerThread: for i in range(0, 100): time.sleep(1.0) # self.procIpcBatch() # don't pump message during start up - print("_zz_", end="", flush=True) + Progress.emit(Progress.SERVICE_START_NAP) + # print("_zz_", end="", flush=True) if self._status.isRunning(): Logging.info("[] TDengine service READY to process requests") Logging.info("[] TAOS service started: {}".format(self)) @@ -595,12 +630,12 @@ class ServiceManagerThread: def stop(self): # can be called from both main thread or signal handler - print("Terminating TDengine service running as the sub process...") + Logging.info("Terminating TDengine service running as the sub process...") if self.getStatus().isStopped(): - print("Service already stopped") + Logging.info("Service already stopped") return if self.getStatus().isStopping(): - print("Service is already being stopped") + Logging.info("Service is already being stopped") return # Linux will send Control-C generated SIGINT to the TDengine process # already, ref: @@ -616,10 +651,10 @@ class ServiceManagerThread: if retCode == signal.SIGSEGV : # SGV Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") except subprocess.TimeoutExpired as err: - print("Time out waiting for TDengine service process to exit") + Logging.info("Time out waiting for TDengine service process to exit") else: if self._tdeSubProcess.isRunning(): # still running, should now never happen - print("FAILED to stop sub process, it is still running... pid = {}".format( + Logging.error("FAILED to stop sub process, it is still running... pid = {}".format( self._tdeSubProcess.getPid())) else: self._tdeSubProcess = None # not running any more @@ -683,9 +718,9 @@ class ServiceManagerThread: return # we are done with THIS BATCH else: # got line, printing out if forceOutput: - Logging.info(line) + Logging.info('[TAOSD] ' + line) else: - Logging.debug(line) + Logging.debug('[TAOSD] ' + line) print(">", end="", flush=True) _ProgressBars = ["--", "//", "||", "\\\\"] @@ -728,11 +763,11 @@ class ServiceManagerThread: # queue.put(line) # meaning sub process must have died - Logging.info("\nEnd of stream detected for TDengine STDOUT: {}".format(self)) + Logging.info("EOF for TDengine STDOUT: {}".format(self)) out.close() def svcErrorReader(self, err: IO, queue): for line in iter(err.readline, b''): - print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line)) - Logging.info("\nEnd of stream detected for TDengine STDERR: {}".format(self)) + Logging.info("TDengine STDERR: {}".format(line)) + Logging.info("EOF for TDengine STDERR: {}".format(self)) err.close() \ No newline at end of file diff --git a/tests/pytest/crash_gen_bootstrap.py b/tests/pytest/crash_gen_bootstrap.py index a3417d21a85ec5ea26c7ebc22ffe398fc436eebe..fd12284b9d7782ac7df89c37fcb653ca3bebe82b 100644 --- a/tests/pytest/crash_gen_bootstrap.py +++ b/tests/pytest/crash_gen_bootstrap.py @@ -11,7 +11,7 @@ ################################################################### import sys -from crash_gen.crash_gen import MainExec +from crash_gen.crash_gen_main import MainExec if __name__ == "__main__":